线程池状态
状态
- RUNNING
- 该状态接受新的任务同时处理队列中的任务
- SHUTDOWN
- 该状态不再接受新的任务,但是队列中的任务继续执行
- STOP
- 不再接受新的任务,也不再处理队列中的任务,并且会中断正在运行的任务
- TIDYING
- 所有任务都已经终止,工作线程数为0。调用terminated()方法状态会变为TIDYING
- TERMINATED
- terminated()方法执行完成
状态转移
- RUNNING -> SHUTDOWN
- 执行shutdown()方法(SHUTDOWN状态可能立即结束进入下一状态)
- (RUNNING or SHUTDOWN) -> STOP
- 执行shutdownNow()方法
- SHUTDOWN -> TIDYING
- 当所有任务队列和线程池(pool)都空的了时候
- STOP -> TIDYING
- 当线程池(pool)为空的时候
- TIDYING -> TERMINATED
- terminated()方法完成
awaitTermination()在状态变为TERMINATED的时候返回
- terminated()方法完成
状态位表示
状态位在线程池中使用一个原子类型的Integer进行存储
1 | private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); |
Integer的长度是32位,用全部32位来表示线程数量,有点浪费。线程池的状态一共就5种,所以大神决定用ctl的高3位(可以表示8种状态了),来表示线程池的状态,低29位用来计数(大约500_000_000),反正就目前机器来说,想同时开启这么多线程。。。机器早就挂了,所以足够了
几个底层依赖的方法
- private static int runStateOf(int c) { return c & ~CAPACITY; }
- 通过位运算,获取当前线程池的状态
- private static int workerCountOf(int c) { return c & CAPACITY; }
- 通过位运算,获取当前线程池的工作线程数
- private static int ctlOf(int rs, int wc) { return rs | wc; }
- 为了看ctl的状态
- 然后就是几个关于ctl的cas操作,包括增加一个线程计数,减少一个线程计数
几个重要的Field
- private final BlockingQueue
workQueue; - 关键参数,设置线程池corePoolSize满了以后,要将任务放到什么样的阻塞队列中。
- private final ReentrantLock mainLock = new ReentrantLock();
- 内部锁,在很多方法中均有用到,包括添加Worker,中断Worker,shutdown,以及获取各种size都需要锁进行同步保护
- private final HashSet
workers = new HashSet (); - 可以理解为线程的集合,Worker 继承了 AQS 并且实现了 Runnable,也就是线程池中对应的线程的集合。
- private final Condition termination = mainLock.newCondition();
- 在 tryTerminate() 方法中进行通知,awaitTermination(long timeout, TimeUnit unit)方法中进行awaitNanos,也就是说调用awaitTermination方法之后,会一直等待,直到tryTerminate方法执行并且通知,才会结束(这时候线程池应该就变为TERMINATED状态了)
- private int largestPoolSize;
- 记录线程池中线程数量曾经达到过的最大值。
- private long completedTaskCount;
- 这个从字面意思就很好立即了,已经完成的任务数量
- private volatile ThreadFactory threadFactory;
- 又一个核心参数,线程的创建工厂,各种大厂的Java开发规范都需要业务自己实现对应的线程工厂,定义线程的名称之类的,主要是后期排查多线程问题的时候方便定位。
- private volatile RejectedExecutionHandler handler;
- 也是核心参数,当线程池达到饱和状态(队列满了,maximumPoolSize也达到了),如果还在继续提交的任务,就依靠这个进行处理。
- 默认有4个实现类,如下:
- CallerRunsPolicy,这个就是将任务返回给调用方,由调用方执行。
- AbortPolicy,这个比较粗暴,直接抛出异常。也是默认实现,参考defaultHandler
- DiscardPolicy,这个应该很少用,直接丢弃提交的任务,Do Nothing
- DiscardOldestPolicy,这个也是丢弃,不过是丢弃队列中最早的一个(直接调用peeK)
- private volatile long keepAliveTime;
- 核心参数之一,线程空闲多久后回收(默认如果小于corePoolSize,则不进行回收)
- private volatile boolean allowCoreThreadTimeOut;
- allowCoreThreadTimeOut,core线程是否超时后回收,默认是false
- private volatile int corePoolSize;
- 核心参数,core线程池大小
- private volatile int maximumPoolSize;
- 核心参数,最大线程池大小(超过这个值就会调用RejectedExecutionHandler)
- private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
- 默认Rejected处理策略,抛出异常
- private static final RuntimePermission shutdownPerm = new RuntimePermission(“modifyThread”);
- 这个真不太清楚,只知道是在调用shutdown相关方法会调用进行安全检查
- private final AccessControlContext acc;
- 在finalize方法中有调用,看不太明白 = =,貌似也是权限访问层面的(AccessController.doPrivileged(pa, acc);)
几个重要的 public 方法
public void execute(Runnable command)
- 提交一个Runnable任务,描述很简单,实现应该是所有方法中最复杂的了,话不多说,直接上源码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();//不允许提交null
int c = ctl.get();
//检查目前工作线程数量是否超过了corePoolSize,没有超过的话直接添加任务,添加成功就返回,添加失败则更新状态值c然后继续
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))//参数为true则添加的是core线程
return;
c = ctl.get();
}
//执行到这里肯定是添加worker失败或者已经达到了corePoolSize
//这时候检查线程状态,确保是Running(因为有可能这期间其他线程调用了shutdown等方法),就开始往队列中添加任务。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//添加到队列之后,再次检查线程池状态,如果状态发生变化,则移除任务并执行拒绝策略
//如果状态没有发生改变,此时如果线程池为空,那就添加一个非核心Worker
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}//到这里说明添加队列失败,要么是线程池编程非RUNNING状态,要么队列满了,则添加非核心线程,非核心线程如果添加还是失败了,就只能执行拒绝策略了
else if (!addWorker(command, false))
reject(command);
}
//上面的步骤还不算复杂,接下就是最复杂的addWorker方法了。
private boolean addWorker(Runnable firstTask, boolean core) {
retry: //标记位,用于循环控制,也就是外层循环
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);//当前线程池状态
//如果线程池状态如下,则返回添加失败
// 1. 线程池状态为STOP,TIDYING,TERMINATED中的一个
// 2. 线程池状态为SHUTDOWN,并且第一个任务不为空
// 3. 线程池状态为SHUTDOWN,并且工作队列为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//开始内层循环,执行到这里说明不是上述123中的状态。
for (;;) {
int wc = workerCountOf(c);//获取工作线程数量
//检查是否达到上限,并根据添加的线程类别(core或者非core)判断是否超过对应的最大值,超过也返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//上述校验通过以后,CAS方式增加一个工作线程,如果成功了,则跳出外层循环
break retry;
//执行到这里说明cas方式增加线程失败,那就重新检查一下线程池状态,然后内层循环继续,直到增加成功。
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//执行到这里,说明已经成功增加了一个线程计数了。
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//使用第一个任务创建一个Worker
final Thread t = w.thread;//获取对应worker的线程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());//检查线程池的状态
//线程池状态是运行的或者刚刚关闭,第一个任务尚未赋值
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//这里确保线程还没有被其他线程启动
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//添加worker到workers集合中
workers.add(w);
int s = workers.size();
//对比并更新最大到达数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;//表示已经增加了worker
}
} finally {
mainLock.unlock();
}
//如果已经成功增加了worker,就可以启动对应的线程了。
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//这里检查一下worker是否已经启动成功,如果没有启动成功,则执行添加失败操作
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;//返回工作线程是否启动成功
}
//简单看一下添加失败的逻辑
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);//添加失败则移除掉Set集合中对应的worker
//并且减少一个工作线程数量计数
decrementWorkerCount();
//尝试关闭线程池(感觉这里是为了释放空闲线程)
tryTerminate();
} finally {
mainLock.unlock();
}
}
- 提交一个Runnable任务,描述很简单,实现应该是所有方法中最复杂的了,话不多说,直接上源码。
public void shutdown()
- 尝试关闭线程池,执行后状态变为SHUTDOWN,继续完成已经提交的任务,但是新的任务不再被接受。
1
2
3
4
5
6
7
8
9
10
11
12
13public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//检查访问权?(这块有点懵)
advanceRunState(SHUTDOWN);//检查是否可以设置为SHUTDOWN并通过CAS操作设置为SHUTDOWN
interruptIdleWorkers();//中断线程
onShutdown(); // hook for ScheduledThreadPoolExecutor ,这里是空实现
} finally {
mainLock.unlock();
}
tryTerminate();//检测并尝试终止线程池
} - 几个内部调用的方法方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
//传参数为SHUTDOWN,则runStateAtLeast在SHUTDOWN、STOP、TIDYING和TERMINATED状态的时候为true,直接跳出循环。如果线程池状态为RUNNING,则进行CAS操作,更新状态位为SHUTDOWN,成功则结束。
if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);//中断所有线程,如果参数为true,则仅中断一个线程,具体见下
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//线程处于非中断状态,并且没有其他线程中断该线程(也就是说只能有一个线程进行中断操作)
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();//依次中断
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)//如果为true,则中断一个后跳出循环
break;
}
} finally {
mainLock.unlock();
}
}
//一个空实现
void onShutdown() {
}
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||//正在运行中,不能设置为TIDYING
runStateAtLeast(c, TIDYING) ||//线程池为TIDYING或者TERMINATED,其他线程已经开始关闭线程池
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))//队列中尚有需要执行的任务,需要等待执行完成,不能设置为TIDYING
return;
//上面一坨条件判断说白了就是在等线程池状态为SHUTDOWN并且队列为空或者线程池状态为STOP才会继续,否则放弃终止操作。
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);//ONLY_ONE在这里为true,也就是仅仅中断一个空闲的worker。
//补充一下:interruptIdleWorkers的作用是因为在getTask方法中执行workQueue.take()时,如果不执行中断会一直阻塞。在shutdown方法中,会中断所有空闲的工作线程,如果在执行shutdown时工作线程没有空闲,然后又去调用了getTask方法,这时如果workQueue中没有任务了,调用workQueue.take()时就会一直阻塞。所以每次在工作线程结束时调用tryTerminate方法来尝试中断一个空闲工作线程,避免在队列为空时取任务一直阻塞的情况。(引用自:https://www.cnblogs.com/liuzhihu/p/8177371.html)
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {//CAS操作将线程池设置为TIDYING状态
try {
terminated();//执行一些清理操作
} finally {
ctl.set(ctlOf(TERMINATED, 0));//设置线程池状态为TERMINATED
termination.signalAll();//通知条件变量termination(也就是awaitTermination方法可以继续执行了)
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
- 尝试关闭线程池,执行后状态变为SHUTDOWN,继续完成已经提交的任务,但是新的任务不再被接受。
public List
shutdownNow() - 立即关闭线程池,设置状态为STOP,不再接受新的任务,并且中断正在运行的任务,返回队列中未执行的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//检查访问权?(这块有点懵)
advanceRunState(STOP);//cas方式设置状态为STOP
interruptWorkers();//中断正在运行的线程
tasks = drainQueue();//将队列中未执行的任务返回
} finally {
mainLock.unlock();
}
tryTerminate();//检测并尝试终止线程池
return tasks;
} - 看一下里面具体的方法,checkShutdownAccess、advanceRunState和tryTerminate见上
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)//遍历workers集合
w.interruptIfStarted();//依次中断
} finally {
mainLock.unlock();
}
}
Worker的interruptIfStarted方法如下:
void interruptIfStarted() {
Thread t;
//state小于0是不可中断的标识,只能在大于等于0的时候进行中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
//转移队列剩余任务方法
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);//将q的所有任务转移到taskList中
if (!q.isEmpty()) {//上一步操作可能会失败,再次检查(什么情况会失败?这块我也没太明白)
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
- 立即关闭线程池,设置状态为STOP,不再接受新的任务,并且中断正在运行的任务,返回队列中未执行的任务。
public boolean isShutdown()
- 通过ctl获取线程池的状态,没啥可说的
public boolean isTerminating()
- 通过ctl获取线程池的状态,只要不是运行的,不是TERMINATED,就处于这个状态
public boolean isTerminated()
- 通过ctl获取线程池的状态,并且状态是TERMINATED
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
- 带超时的等待线程池状态变为TERMINATED
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))//CAS循环判断状态是否为TERMINATED
return true;
if (nanos <= 0)//超时则返回false
return false;
nanos = termination.awaitNanos(nanos);//还记得上面的termination.signalAll()吧,就是这里等待通知
}
} finally {
mainLock.unlock();
}
}
- 带超时的等待线程池状态变为TERMINATED
public boolean prestartCoreThread()
- 调用该方法,则执行启动一个核心线程,源码比较简单,只要不足corePoolSize,就执行增加操作。
1
2
3
4public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
- 调用该方法,则执行启动一个核心线程,源码比较简单,只要不足corePoolSize,就执行增加操作。
public int prestartAllCoreThreads()
- 一次性启动Core工作线程到corePoolSize,返回启动了几个线程。
1
2
3
4
5
6public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
- 一次性启动Core工作线程到corePoolSize,返回启动了几个线程。
public boolean remove(Runnable task)
- 移除一个task,移除操作之后会执行tryTerminate()方法
1
2
3
4
5
6public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
//会检查线程池状态,并根据状态决定是否终止线程池(特别针对的场景就是shudown状态+空队列,就可以进入下一个状态)
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
- 移除一个task,移除操作之后会执行tryTerminate()方法
public void purge()
- 尝试从队列中移除所有已经取消了的Future任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public void purge() {
final BlockingQueue<Runnable> q = workQueue;
try {
Iterator<Runnable> it = q.iterator();
while (it.hasNext()) {
Runnable r = it.next();
//移除掉已经取消的Future任务
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
it.remove();
}
} catch (ConcurrentModificationException fallThrough) {//遍历过程中发生了其他修改,快速失败采用数组快照方式删除
// Take slow path if we encounter interference during traversal.
// Make copy for traversal and call remove for cancelled entries.
// The slow path is more likely to be O(N*N).
for (Object r : q.toArray())
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
q.remove(r);
}
//会检查线程池状态,并根据状态决定是否终止线程池(特别针对的场景就是shudown状态+空队列,就可以进入下一个状态)
tryTerminate(); // In case SHUTDOWN and now empty
}
- 尝试从队列中移除所有已经取消了的Future任务
待分析内容
简单概述
- 上面这一堆只是简单分析了一下ThreadPoolExecutor内部的方法,这个类继承了AbstractExecutorService,之后会在分析一下AbstractExecutorService,此外ThreadPoolExecutor内部还有一个Worker类,它承了AbstractQueuedSynchronizer(AQS),这两个也是后面要分析的重点。尤其是AQS,后面的各种锁相关都是依赖于它
TODO项
- AbstractExecutorService
- Worker
- AbstractQueuedSynchronizer