线程池实现原理

Java 并发编程的艺术第2版学习笔记

原书:《Java 并发编程的艺术第2版》 | 作者:方腾飞 魏鹏 程晓明 | 2023 年 9 月 | 机械工业出版社
1.4.3 线程池技术及其示例 | 27 页

核心组件

任务队列「jobs」

存放执行任务对象

工作队列「workers」

纯线程池:存放工作线程对象,创建好的线程都放在这里。

常用参数

  • 线程池默认的数量
  • 线程池最大限制数
  • 线程池最小的数量
  • 工作队列「workers」存放线程的数量

工作原理

  1. 工作线程「Worker」在 while 循环中获取任务队列jobs」中的 job,获取不到时使用 jobs.wait() 方法等待
  2. 任务队列jobs」中添加新任务后,会调用 jobs.notify() 方法唤醒工作线程Worker
  3. 工作线程Worker」从任务队列jobs」中获取到任务后执行任务
  4. 任务执行完毕后,开始新一轮循环:从第 1 步开始

示例代码

ThreadPool

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public interface ThreadPool<Job extends Runnable> {

    void execute(Job job);

    void shutdown();

    void addWorkers(int num);

    void removeWorkers(int num);

    int getJobSize();
}

DefaultThreadPool

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
    // 线程池最大限制数
    private static final int MAX_WORKER_NUMBERS = 10;
    // 线程池默认的数量
    private static final int DEFAULT_WORKER_NUMBERS = 5;
    // 线程池最小的数量
    private static final int MIN_WORKER_NUMBERS = 1;
    // 这是一个工作列表,将会向里面插入工作
    private final LinkedList<Job> jobs = new LinkedList<>();
    // 工作者列表
    private final LinkedList<Worker> workers = new LinkedList<>();
    // 线程编号生成
    private final AtomicLong threadNum = new AtomicLong();
    // 工作者线程的数量
    private int workerNum = DEFAULT_WORKER_NUMBERS;

    public DefaultThreadPool() {
        initializeWorkers(DEFAULT_WORKER_NUMBERS);
    }

    public DefaultThreadPool(int num) {
        workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
        initializeWorkers(workerNum);
    }


    @Override
    public void execute(Job job) {
        if (job != null) {
            synchronized (jobs) {
                jobs.addLast(job);
                jobs.notify();
            }
        }
    }

    @Override
    public void shutdown() {
        removeWorkers(workerNum);
    }

    @Override
    public synchronized void addWorkers(int num) {
        // 限制新增的 Worker 数量不能超过最大值
        if (num + this.workerNum > MAX_WORKER_NUMBERS) {
            num = MAX_WORKER_NUMBERS - this.workerNum;
        }
        initializeWorkers(num);
        this.workerNum += num;
    }

    @Override
    public synchronized void removeWorkers(int num) {
        if (num >= this.workerNum) {
            throw new IllegalArgumentException("beyond workNum");
        }
        // 按照给定的数量停止 Worker
        int count = 0;
        while (count < num) {
            workers.removeFirst().shutdown();
            count++;
        }
        this.workerNum -= count;
    }

    @Override
    public int getJobSize() {
        return jobs.size();
    }

    private void initializeWorkers(int num) {
        for (int i = 0; i < num; i++) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
            thread.start();
        }
    }

    class Worker implements Runnable {
        private volatile boolean running = true;

        @Override
        public void run() {
            while (running) {
                Job job = null;
                synchronized (jobs) {
                    // 如果工作列表是空的,那么就 wait
                    while (jobs.isEmpty()) {
                        try {
                            jobs.wait();
                        } catch (InterruptedException e) {
                            // 感知到外部对 WorkerThread 的中断操作,返回
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    // 取出一个 Job
                    job = jobs.removeFirst();
                }
                if (job != null) {
                    try {
                        job.run();
                    } catch (Exception e) {
                        // 忽略 Job 执行中的 Exception
                        e.printStackTrace();
                    }
                }
            }
        }

        public void shutdown() {
            running = false;
        }
    }
}
comments powered by Disqus