ThreadPoolExecutor源码剖析
ThreadPoolExecutor源码剖析
自定义构建线程池,可以细粒度的控制线程池,去管理内存的属性,并且针对一些参数的设置可能更好的在后期排查问题。这也是阿里巴巴开发手册推荐我们这样做的。
先看一下ThreadPoolExecutor提供的七个核心参数
public ThreadPoolExecutor(int corePoolSize, // 核心工作线程(当前任务执行结束后,不会被销毁)
int maximumPoolSize, // 最大工作线程(代表当前线程池中,一共可以有多少个工作线程)
long keepAliveTime,// 非核心工作线程在阻塞队列位置等待的时间
TimeUnit unit,// 非核心工作线程在阻塞队列位置等待时间的单位
BlockingQueue<Runnable> workQueue,// 任务在没有核心工作线程处理时,任务先扔到阻塞队列中
ThreadFactory threadFactory,// 构建线程的线程工厂,可以设置thread的一些信息
RejectedExecutionHandler handler) { // 当线程池无法处理投递过来的任务时,执行当前的拒绝策略
// 初始化线程池的操作,省略...
}
ThreadPoolExecutor常见属性讲解:
// ctl高3位表示线程池状态,低29位表示工作线程的个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 声明了一个常量:COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY就是当前工作线程能记录的工作线程的最大个数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态
// runState is stored in the high-order bits
// 111:代表RUNNING状态,RUNNING可以处理任务,并且处理阻塞队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
// 000:代表SHUTDOWN状态,不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完。
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001:代表STOP状态,不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管。
private static final int STOP = 1 << COUNT_BITS;
// 010:代表TIDYING状态,这个状态是由SHUTDOWN或者STOP转换过来的,代表当前线程池马上关闭,就是过渡状态
private static final int TIDYING = 2 << COUNT_BITS;
// 011:代表TERMINATED状态,这个状态是TIDYING状态转换过来的,转换过来只需要执行一个terminated方法。
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 基于&运算的特点,保证只会拿到ctl高三位的值
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 基于&运算的特点,保证只会拿到ctl低29位的值。
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池状态的特点以及转换方式:
execute源码的分析
public void execute(Runnable command) {
// 提交过来的任务不能为null
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 如果工作线程数<核心线程数
if (workerCountOf(c) < corePoolSize) {
// 将任务添加到工作队列,如果添加成功,直接返回,
// 参数:true:核心线程,false:非核心线程
if (addWorker(command, true))
return;
// 如果添加失败,再重新获取一下线程池的状态和工作线程的数量
c = ctl.get();
}
// 如果当前线程池为运行状态,则将任务添加到工作队列中
if (isRunning(c) && workQueue.offer(command)) {
// 再重新获取一下线程池的状态和工作线程的数量
int recheck = ctl.get();
// 如果当前线程池不是运行状态,那么就将当前任务从队列中移除出去
if (! isRunning(recheck) && remove(command))
// 使用拒绝策略,拒绝当前任务
reject(command);
// 如果当前工作线程的个数为0,则添加
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 将当前任务放到非核心线程中去处理
else if (!addWorker(command, false))
// 如果非核心线程处理不了就执行拒绝策略
reject(command);
}
总结一下:任务来了,先交由核心线程进行处理,如果核心线程处理不过来的话,就扔到队列中,如果队列中满了,则交由非核心线程进行处理,如果非核心线程也处理不过来了,则执行拒绝策略。当然这是线程池的默认执行步骤,由于这个方法没有被final进行修饰,我们是可以重写这个方法的,来修改线程池的执行步骤。
addWorker源码剖析:
// 核心线程和非核心线程的创建都是基于这个方法创建的。
private boolean addWorker(Runnable firstTask, boolean core) {
retry: // 跳出双层for循环的一种方式
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果线程池状态不是RUNNING.直接return false。
if (rs >= SHUTDOWN &&
// 第二个判断,解决在shutdown状态下,没有工作线程,要构建一个线程处理阻塞队列中任务的情况
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
// 说白了,以下是线程池状态是RUNNING的情况下才会执行的
for (;;) {
int wc = workerCountOf(c);
// 工作线程个数不能超过最大值
if (wc >= CAPACITY ||
// true:判断核心线程数 false:判断最大线程数(也就是非核心线程数)
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 为了避免出现多线程并发创建工作线程,导致破坏设置的核心参数
if (compareAndIncrementWorkerCount(c))
// 如果成功,跳出最外层循环。
break retry;
c = ctl.get(); // Re-read ctl
// 如果当前线程状态不一致了,说明出现了问题,也要跳出最外层循环。
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 创建工作线程并将任务封装到worker对象中
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将worker对象放到hashset集合中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 调用Thread的start方法启动线程执行任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}