结构
其实从结构上来看,Worker十分简单。实现了Runnable接口,同时继承了AQS队列。如下图所示:
Worker的方法也不多,也比较简单,如下图所示:
分析
内部Field
- 内部Field不多,如下:
- Thread thread 实际的工作线程
- Runnable firstTask 初始化的第一个任务
- long completedTasks 当前Worker已经完成的任务数
- 在补充一下父类的state
- 0 代表是未锁定状态
- 1 代表是锁定状态
- -1 代表是不允许被中断,在构造参数中设置
接下来简单分析一下各个方法:
方法
public void run()
- 直接调用线程池的runWorker方法,之后分析
1
2
3public void run() {
runWorker(this);
}
protected boolean isHeldExclusively()
- 是否是独占排他的
1
2
3protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused)
- 尝试获取锁,参考AQS,这里还是使用CAS进行操作,失败则快速返回
1
2
3
4
5
6
7protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused)
- 尝试释放锁,这个貌似只会成功,不会失败
1
2
3
4
5protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock()
- 加锁,不过这里是阻塞式的
1
public void lock() { acquire(1); }
public boolean tryLock()
- 尝试加锁,调用tryAcquire
1
public boolean tryLock() { return tryAcquire(1); }
public void unlock()
- 释放锁操作,参考AQS
1
public void unlock() { release(1); }
public boolean isLocked()
- 是否处于锁定状态,调用isHeldExclusively方法,也就是看state是否为0
1
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted()
- 如果处于运行状态,则进行中断。state>=0代表可以进行中断。
1
2
3
4
5
6
7
8
9void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
关联方法
runWorker
- Worker直接调用线程池的runWorker方法,将自身作为参数,执行任务,具体如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48final void runWorker(Worker w) {
//当前工作线程
Thread wt = Thread.currentThread();
//获取待执行的初始任务
Runnable task = w.firstTask;
//清除firstTask
w.firstTask = null;
//释放w锁(这个时候可以进行中断操作)
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//开始循环获取任务
while (task != null || (task = getTask()) != null) {
//获取任务之后先进行加锁
w.lock();
//检查线程池状态,是否需要中断。
//这块逻辑比较绕,整理一下
// 首先wt也就是当前线程,不能被中断。
// 如果线程池的状态为STOP,TIDYING,TERMINATED 则直接中断
// (剩下的就是原文中的注释了)如果线程池正在停止过程中,确保线程是中断的。否则就确保线程不会被中断。
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(),STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//这里实际上是空实现
Throwable thrown = null;
try {
task.run();//实际执行
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//这里也是空实现
}
} finally {
task = null;
w.completedTasks++;//执行完成后,完成任务+1
w.unlock();//执行完成后释放锁
}
}
completedAbruptly = false;
} finally {
//清理工作,执行到这里代表Worker已经准备销毁了
processWorkerExit(w, completedAbruptly);
}
}
getTask
- 获取任务的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
//先检查线程池的状态
int c = ctl.get();
int rs = runStateOf(c);
//如果已经关闭并且队列为空则返回null,并减少一个工作线程
//如果已经为STOP,TIDYING,TERMINATED 则减少一个工作线程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//获取工作线程数量
int wc = workerCountOf(c);
//判断是否需要清理Worker
// 一种是allowCoreThreadTimeOut=true的情况
//一种是工作线程数量已经超过核心线程数量了
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//状态检查
//满足以下两个条件,则会减少工作线程数量
//1.已经超时或者工作线程数量超过最大线程数量的
//2.至少有一个工作线程或者任务队列为空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://带有超时的方式获取,超时之后返回null
workQueue.take();//阻塞方式获取
//没有超时则返回结果,否则设置超时状态
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkerExit
- 做一些收尾工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 还记得runWork方法中的completedAbruptly么,就是这个了,为true代表没有执行的改变为false,突然执行到这里了。
decrementWorkerCount();//减少工作线程数量
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;//更新一下总共完成的任务
workers.remove(w);//从Worker集合中中移除自己
} finally {
mainLock.unlock();
}
//尝试设置线程池为TERMINATED,见线程池部分分析
//线程池为SHUTDOWN且队列为空或者线程池状态为STOP,则触发设置线程池为TERMINATED
tryTerminate();
int c = ctl.get();//获取当前线程池状态
if (runStateLessThan(c, STOP)) {//是否已经停止或者终止
//是否突然过来的,如果不是突然过来的,代表正常结束
if (!completedAbruptly) {
//根据allowCoreThreadTimeOut获取线程池数量最小值
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//队列不为空则最小值不能为0
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//当前工作线程数量大于等于最小值,则代表还不能结束,继续执行
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//执行到这里说明当前线程数量小于min的值,需要添加一个Worker
//或者突然执行过来的,可能有异常,添加一个Worker
addWorker(null, false);
}
}