ThreadPoolExecutor源码剖析

~码铃薯~ / 2024-08-31 / 原文

ThreadPoolExecutor源码剖析

自定义构建线程池,可以细粒度的控制线程池,去管理内存的属性,并且针对一些参数的设置可能更好的在后期排查问题。这也是阿里巴巴开发手册推荐我们这样做的。

先看一下ThreadPoolExecutor提供的七个核心参数

public ThreadPoolExecutor(int corePoolSize, // 核心工作线程(当前任务执行结束后,不会被销毁)
                          int maximumPoolSize, // 最大工作线程(代表当前线程池中,一共可以有多少个工作线程)
                          long keepAliveTime,// 非核心工作线程在阻塞队列位置等待的时间
                          TimeUnit unit,// 非核心工作线程在阻塞队列位置等待时间的单位
                          BlockingQueue<Runnable> workQueue,// 任务在没有核心工作线程处理时,任务先扔到阻塞队列中
                          ThreadFactory threadFactory,// 构建线程的线程工厂,可以设置thread的一些信息
                          RejectedExecutionHandler handler) { // 当线程池无法处理投递过来的任务时,执行当前的拒绝策略
	// 初始化线程池的操作,省略...
}

ThreadPoolExecutor常见属性讲解:

// ctl高3位表示线程池状态,低29位表示工作线程的个数    
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 声明了一个常量:COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY就是当前工作线程能记录的工作线程的最大个数
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;


// 线程池状态
// runState is stored in the high-order bits
// 111:代表RUNNING状态,RUNNING可以处理任务,并且处理阻塞队列中的任务
private static final int RUNNING    = -1 << COUNT_BITS;
// 000:代表SHUTDOWN状态,不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完。
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001:代表STOP状态,不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管。
private static final int STOP       =  1 << COUNT_BITS;
// 010:代表TIDYING状态,这个状态是由SHUTDOWN或者STOP转换过来的,代表当前线程池马上关闭,就是过渡状态
private static final int TIDYING    =  2 << COUNT_BITS;
// 011:代表TERMINATED状态,这个状态是TIDYING状态转换过来的,转换过来只需要执行一个terminated方法。
private static final int TERMINATED =  3 << COUNT_BITS;


// Packing and unpacking ctl
// 基于&运算的特点,保证只会拿到ctl高三位的值
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 基于&运算的特点,保证只会拿到ctl低29位的值。
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池状态的特点以及转换方式:

image

execute源码的分析

public void execute(Runnable command) {
    // 提交过来的任务不能为null
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    // 如果工作线程数<核心线程数
    if (workerCountOf(c) < corePoolSize) {
        // 将任务添加到工作队列,如果添加成功,直接返回,
        // 参数:true:核心线程,false:非核心线程
        if (addWorker(command, true))
            return;
        // 如果添加失败,再重新获取一下线程池的状态和工作线程的数量
        c = ctl.get();
    }
    // 如果当前线程池为运行状态,则将任务添加到工作队列中
    if (isRunning(c) && workQueue.offer(command)) {
        // 再重新获取一下线程池的状态和工作线程的数量
        int recheck = ctl.get();
        // 如果当前线程池不是运行状态,那么就将当前任务从队列中移除出去
        if (! isRunning(recheck) && remove(command))
            // 使用拒绝策略,拒绝当前任务
            reject(command);
        // 如果当前工作线程的个数为0,则添加
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 将当前任务放到非核心线程中去处理 
    else if (!addWorker(command, false))
        // 如果非核心线程处理不了就执行拒绝策略
        reject(command);
}

总结一下:任务来了,先交由核心线程进行处理,如果核心线程处理不过来的话,就扔到队列中,如果队列中满了,则交由非核心线程进行处理,如果非核心线程也处理不过来了,则执行拒绝策略。当然这是线程池的默认执行步骤,由于这个方法没有被final进行修饰,我们是可以重写这个方法的,来修改线程池的执行步骤。

addWorker源码剖析:

// 核心线程和非核心线程的创建都是基于这个方法创建的。
private boolean addWorker(Runnable firstTask, boolean core) {
    retry: // 跳出双层for循环的一种方式
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 如果线程池状态不是RUNNING.直接return false。
        if (rs >= SHUTDOWN &&
            // 第二个判断,解决在shutdown状态下,没有工作线程,要构建一个线程处理阻塞队列中任务的情况
            ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
            return false;

        // 说白了,以下是线程池状态是RUNNING的情况下才会执行的
        for (;;) {
            int wc = workerCountOf(c);
            // 工作线程个数不能超过最大值
            if (wc >= CAPACITY ||
                // true:判断核心线程数 false:判断最大线程数(也就是非核心线程数)
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 为了避免出现多线程并发创建工作线程,导致破坏设置的核心参数
            if (compareAndIncrementWorkerCount(c))
                // 如果成功,跳出最外层循环。
                break retry;
            c = ctl.get();  // Re-read ctl
            // 如果当前线程状态不一致了,说明出现了问题,也要跳出最外层循环。
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask); // 创建工作线程并将任务封装到worker对象中
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 将worker对象放到hashset集合中
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 调用Thread的start方法启动线程执行任务
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}