线程池源码分析

线程池及简单介绍

(1)线程池类关系图

线程池类关系图.png

Executors 是对线程执行的工具类,可以看做是线程池的工厂。
execute() 方法任务立即执行,submit() 方法提交任务等待线程自己调度运行。

(2)线程池核心参数的交互

  • corePoolSize 核心线程数
  • maximumPoolSize 最大线程数
  • keepAliveTime 生存时间,线程池中超过核心线程数大小的线程的存活时间,如果设置 allowCoreThreadTimeOuttrue,那么核心线程数也是此时间的存活时间
  • unit 生存时间的单位,类型是 TimeUnit
  • workQueue 任务队列 BlockingQueue<Runnable>
  • threadFactory 线程工厂 ThreadFactory
  • handler 拒绝策略, 类型是 RejectedExecutionHandler

首先线程池中会保持 corePoolSize 大小的线程运行,即使没有任务也会空闲运行,但是如果设置了 allowCoreThreadTimeOut = true,
那么核心线程也会在 keepAliveTime 时间大小之后关闭。当任务超过了核心线程数的话,会把新的任务放到工作队列,
如果工作队列满了,并且当前的工作线程是小于 maximumPoolSize 定义的最大线程数的,那么创建一个新线程来运行任务,
当运行的线程数超过了最大线程数的值,拒绝策略开始发挥作用,默认是丢弃策略。

终于来到了我们的线程池的源码解析啦,因为我们知道jdk自带的各种线程池本质上都是核心类 ThreadPollExecutor 构造出来的,所以我们来看里面到到底是个
什么鬼?

源码分析

  • 成员变量

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    public class ThreadPoolExecutor extends AbstractExecutorService {

    // 高 3 位表示线程池状态,低29位表示 worker 的数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3; // 32 - 3 = 29

    /**
    * 0010 0000 0000 0000 0000 0000 0000 0000 COUNT_BITS 二进制值
    - 0000 0000 0000 0000 0000 0000 0000 0001 1 二进制值
    ————————————————————————————————————————————
    = 0001 1111 1111 1111 1111 1111 1111 1111 COUNT_MASK 二进制值
    * */
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; // 1 * 2^29 -1

    // runState is stored in the high-order bits
    private static final int RUNNING = -1 << COUNT_BITS; // 表示接收新任务和处理队列中的任务
    private static final int SHUTDOWN = 0 << COUNT_BITS; // 表示不接收新任务了,但是会处理队列中的任务
    private static final int STOP = 1 << COUNT_BITS; // 表示不接收新任务,也不处理队列中的任务,中断进行中的任务
    private static final int TIDYING = 2 << COUNT_BITS; // 如果所有的任务的已经结束,工作线程是0,此时的线程状态转变为 TIDYING,调用 terminated() 钩子方法
    private static final int TERMINATED = 3 << COUNT_BITS; // 表示 terminated() 方法执行完成

    // Packing and unpacking ctl
    // ~COUNT_MASK 值: 1110 0000 0000 0000 0000 0000 0000 0000
    // 按位与,低 29 位都是 0
    private static int runStateOf(int c) { return c & ~COUNT_MASK; }

    // COUNT_MASK 二进制值 : 0001 1111 1111 1111 1111 1111 1111 1111
    // 按位与,高 3 位都是 0
    private static int workerCountOf(int c) { return c & COUNT_MASK; }

    // 根据线程池状态和线程数量计算 control(ctl) 的值
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    }

    ctl 这个原子整形变量,包含了两个含义,一个是 workerCount,就是线程池里面 worker 数量,另一个是 runState,就是线程池状态,运行还是停止等等。咦,一个变量怎么表示
    的两种意思呢?有一个很巧妙的方法就是高3 位表示线程池的状态,低 29 位来表示线程池中的线程数量,目前来说是29位数量足够的,如果不够,后面会扩展成 Long 类型就可以,这样
    在高并发的情况减少变量的锁同步。具体怎么做到的呢?先来看看线程池状态的表示,首先是 RUNNING 状态:

    private static final int RUNNING = -1 << COUNT_BITS;

    我们知道 COUNT_BITS 是 29, -1 的二进制值表示是 1 的二进制值,取反,然后加 1 ,就是 -1 的二进制表示。int 是 32 位,那么:

    0000 0000 0000 0000 0000 0000 0000 0001 1 的二进制
    1111 1111 1111 1111 1111 1111 1111 1110 取反
    1111 1111 1111 1111 1111 1111 1111 1111 加 1 即为 -1 的二进制表示

    RUNNING 状态是左移 29 位,那么:

    1111 1111 1111 1111 1111 1111 1111 1111 -1 的二进制表示
    1110 0000 0000 0000 0000 0000 0000 0000 左移 29 后的值

    RUNNING 的值是:1110 0000 0000 0000 0000 0000 0000 0000

    同理得到:
    SHUTDOWN   的值是 0000 0000 0000 0000 0000 0000 0000 0000
    STOP     的值是 0010 0000 0000 0000 0000 0000 0000 0000
    TIDYING     的值是 0100 0000 0000 0000 0000 0000 0000 0000
    TERMINATED  的值是 0110 0000 0000 0000 0000 0000 0000 0000

    所以 ctl 变量的默认值是 RUNNING 的值 和 0 或运算,即 ctl = 1110 0000 0000 0000 0000 0000 0000 0000,实际上意思就是线程池在 RUNNING 状态,但是 0 个工作线程。
    通过方法 runStateOf() 可以拿到当前线程池的状态值。

    上面说的是线程池状态的表示,再来看一下线程池中线程的个数,变量 COUNT_MASK 就是表示线程数的大小,最多(2^29 - 1) 个,通过方法 workerCountOf() 可以计算线程的个数。

  • 构造方法

    构造方法主要是进行一些非空判断和校验。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public class ThreadPoolExecutor extends AbstractExecutorService {
    public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
    maximumPoolSize <= 0 ||
    maximumPoolSize < corePoolSize ||
    keepAliveTime < 0)
    throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
    throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime); // 将存活时间转换成纳秒
    this.threadFactory = threadFactory;
    this.handler = handler;
    }
    }
  • 提交任务的过程

    提交任务时,会调用 execute() 方法,我们来看一下这个方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    public class ThreadPoolExecutor extends AbstractExecutorService {
    public void execute(Runnable command) {
    if (command == null)
    throw new NullPointerException();
    /*
    * Proceed in 3 steps:
    *
    * 1. If fewer than corePoolSize threads are running, try to
    * start a new thread with the given command as its first
    * task. The call to addWorker atomically checks runState and
    * workerCount, and so prevents false alarms that would add
    * threads when it shouldn't, by returning false.
    *
    * 2. If a task can be successfully queued, then we still need
    * to double-check whether we should have added a thread
    * (because existing ones died since last checking) or that
    * the pool shut down since entry into this method. So we
    * recheck state and if necessary roll back the enqueuing if
    * stopped, or start a new thread if there are none.
    *
    * 3. If we cannot queue task, then we try to add a new
    * thread. If it fails, we know we are shut down or saturated
    * and so reject the task.
    */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true)) // 带了任务参数
    return;
    c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (!isRunning(recheck) && remove(command))
    reject(command);
    else if (workerCountOf(recheck) == 0)
    addWorker(null, false); // 没有带任务参数
    } else if (!addWorker(command, false))
    reject(command);
    }
    }

    execute() 方法的思路注释文档说的很清楚,首先非空判断,然后拿到 ctl 的值,来判断:

    1. 值(默认是 0 )小于核心线程数的
      如果是小于就添加一个 Worker,这个 Worker 是 ThreadPoolExecutor 的内部类,简单来讲就是封装了线程信息和任务信息的类,后面再谈。
    2. 值大于核心线程数,
      • 2.1 再判断线程池状态是 RUNNING 状态,就添加到工作队列当中去
        • 2.1.1 这里注意的是还会再一次检查线程池的状态,以防线程池的状态在添加队列的过程中发生改变,如果发现线程池不是 RUNNING 状态的话,
          那么就把队列里面的任务 remove 掉,然后调用拒绝策略来拒绝,默认是丢弃。
        • 2.1.2 在 2.1.1 的基础判断上,如果线程池仍然是 RUNNING 的状态,但是 workerCountOf() 拿到的 worker 是 0,那么添加一个 worker,
          这里只在线程池里面增加一个线程。这里我们可以看到核心线程数满了之后,先添加到队列,如果线程池中的 worker 是 0 的话,那么会新加一个线程,
          核心线程会带着任务直接执行,而核心线程之外的线程是从队列里面取任务来执行的,注意 addWorker() 方法的调用。
      • 2.2 线程池状态并不是 RUNNING 状态的话,或者任务进入队列失败了,尝试创建worker执行任务,实际上 addWorker() 方法里面也是判断了
        线程池状态的,不是 RUNNING 状态的话直接返回 false,添加任务失败,触发 reject 策略。
  • addWorker 分析

    在上面添加任务的分析过程中,主要是调用 addWorker() 的方法,现在来窥探下 addWorker() 方法:

    点击展开:addWorker() 方法 >folded
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83

    public class ThreadPoolExecutor extends AbstractExecutorService {

    private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl.get(); ; ) {
    /**
    * 这里的 if 条件等价于
    * if(rs > SHUTDOWN ||
    * rs == SHUTDOWN && firstTask != null ||
    * rs == SHUTDOWN && workQueue.isEmpty()
    * )
    * 1. 线程池状态 > SHUTDOWN 时,直接返回false;
    * 2. 线程池状态 = SHUTDOWN 时,且 firstTask 不等于 null,直接返回false;
    * 3. 线程池状态 = SHUTDOWN 时,且工作队列是空的话,直接返回false;
    * */
    // Check if queue empty only if necessary.
    if (runStateAtLeast(c, SHUTDOWN)
    && (runStateAtLeast(c, STOP)
    || firstTask != null
    || workQueue.isEmpty()))
    return false;

    // 内存自旋
    for (; ; ) {
    // worker 超过容量,直接返回 false
    if (workerCountOf(c)
    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
    return false;
    // 使用 CAS 的方式增加 worker 数量,成功了跳出外层 for 循环
    if (compareAndIncrementWorkerCount(c))
    break retry;
    c = ctl.get(); // Re-read ctl
    // 如果线程状态发生变化外层自旋。
    if (runStateAtLeast(c, SHUTDOWN))
    continue retry;
    // else CAS failed due to workerCount change; retry inner loop
    }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    // Recheck while holding lock.
    // Back out on ThreadFactory failure or if
    // shut down before lock acquired.
    int c = ctl.get();

    // 重新检查线程池的状态
    if (isRunning(c) ||
    (runStateLessThan(c, STOP) && firstTask == null)) {
    if (t.getState() != Thread.State.NEW)
    throw new IllegalThreadStateException();
    workers.add(w);
    workerAdded = true;
    int s = workers.size();
    if (s > largestPoolSize)
    largestPoolSize = s;
    }
    } finally {
    mainLock.unlock();
    }
    // 启动 worker 的线程
    if (workerAdded) {
    t.start();
    workerStarted = true;
    }
    }
    } finally {
    if (!workerStarted)
    addWorkerFailed(w);
    }
    return workerStarted;
    }
    }

  这个方法的代码分为两部分,第一部分是 for() 循环,也就干了一件事把 workerCount 加 1,第二部分循环后执行的代码,目的是添加一个 Worker 对象。在第一
部分代码里面,主要就是第一层的循环判断当前的线程池状态,如果是大于 shutdown 的话就直接返回 false。第二层的循环,先去判断当前的线程数量是不是超过了 corePoolSize 和 maximumPoolSize,然后再用
CAS 的方式把 workerCount 加 1,这是第一部分代码的主要逻辑。
我们再来看看第二部分代码的逻辑,就是 new 一个 Worker 对象,然后把封装任务信息,添加到 workers 集合里面,因为涉及到多线程,肯定需要加锁的,这里用 ReentrantLock
来实现的,加入之后,启动这个线程,开始执行任务。addWorker() 这个方法的思路还是比较清晰的。
接下来我们来看一看 Worker 这个类:

点击展开:Worker 类 >folded
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

public class ThreadPoolExecutor extends AbstractExecutorService {
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}

}
}

  主要看一下构造方法,在构造方法里面最重要的就是把当前的 Worker 对象 this 当作参数进行 thread 的生成,实际上等价与 new Thread(runnable)。 因为 Worker 本身是继承了 AQS 类又实现了 Runnable 类的。所以我们要看一下 Worker 重写 run() 方法里面 runWorker() 方法。

  • runWorker 核心线程执行逻辑

    先来看一下 runWorker 的代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    public class ThreadPoolExecutor extends AbstractExecutorService {
    final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 为了能够让外部中断
    w.unlock(); // allow interrupts
    // 所有任务完成时标志,线程池正在结束
    boolean completedAbruptly = true;
    try {
    // 如果 firstTask 不为空就执行 first task,为空就去队列里面取。
    while (task != null || (task = getTask()) != null) {
    w.lock();
    // If pool is stopping, ensure thread is interrupted;
    // if not, ensure thread is not interrupted. This
    // requires a recheck in second case to deal with
    // shutdownNow race while clearing interrupt
    if ((runStateAtLeast(ctl.get(), STOP) ||
    (Thread.interrupted() &&
    runStateAtLeast(ctl.get(), STOP))) &&
    !wt.isInterrupted())
    wt.interrupt();
    try {
    // 空实现,为了扩展
    beforeExecute(wt, task);
    try {
    task.run();
    afterExecute(task, null);
    } catch (Throwable ex) {
    afterExecute(task, ex);
    throw ex;
    }
    } finally {
    // 帮助 GC
    task = null;
    w.completedTasks++;
    w.unlock();
    }
    }
    completedAbruptly = false;
    } finally {
    processWorkerExit(w, completedAbruptly);
    }
    }
    }

    上面我们提到 Worker 这个类既继承了 AQS 类又实现了 Runnable 类,那么它本身就是一把锁,就可以做同步。很多线程都会去 new Worker(),然后去放自己的
    任务,所以给自己加了一把锁,然后只执行自己的任务,如果不加锁,别的线程进来之后有可能让它执行其他的任务,所以这个时候要枷锁。这是自己继承 AQS,不用 new
    其他的锁了,自己 lock 就行了。这一部分实际上就是执行任务,加了很多状态的判断逻辑。

    总结来说,首先提交任务 submit (返回 Future 对象) 或者 execute 任务,然后核心线程池够不够,启动核心的,
    核心线程够了就加入阻塞队列,队列满了,就添加非核心线程,一直到的最大线程数,然后再执行拒绝策略。

作者

操先森

发布于

2021-09-16

更新于

2021-09-16

许可协议

评论