AQS分析第一篇,整理得快吐血了,不过话说回来,看一遍和整理了一遍真的完全不一样,收获还是很多的。
这里基本把AQS整个类分析了一遍,剩下还有就是条件对象以及AQS应用了,后续有时间在整理了。
AbstractOwnableSynchronizer
- 比较简单,内部一个exclusiveOwnerThread,附带get和set方法,不多说
Node
- AbstractQueuedSynchronizer 核心依赖,内部队列就是由Node组成
核心Field
状态类
- int CANCELLED = 1;
- 代表被取消了
- int SIGNAL = -1;
- 代表需要被唤醒
- int CONDITION = -2;
- 代表在条件队列中等待
- int PROPAGATE = -3;
- 代表释放资源的时候需要通知其他Node
- int waitStatus
- 代表当前Node的等待状态,取值为CANCELLED、SIGNAL、CONDITION、PROPAGATE,默认初始化为0
记录阻塞模式
- Node SHARED
- 代表该Node是因为获取共享资源被阻塞而放入AQS
- Node EXCLUSIVE
- 代表该Node是因为获取独占资源被阻塞而放入AQS
链表相关
- 提供前驱和后继节点的访问方法,也就是说链表是双向的
- Node prev
- 记录当前节点的前驱节点
- Node next
- 记录当前节点的后继节点
其他
- Thread thread
- thread用于存放进入AQS队列的里面的线程
- Node nextWaiter
- 在Node作为同步队列节点时,nextWaiter可能有两个值:EXCLUSIVE、SHARED标识当前节点是独占模式还是共享模式;
- 在Node作为等待队列节点使用时,nextWaiter保存后继节点。
核心方法
boolean isShared()
- 当前节点获取资源采用的是否为共享方式
1
2
3final boolean isShared() {
return nextWaiter == SHARED;
}
Node predecessor()
- 获取前置节点,如果前置节点为null,则抛出NPE异常
1
2
3
4
5
6
7final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
AbstractQueuedSynchronizer
核心Field
- Node head
- 内部队列的头结点
- Node tail
- 内部队列的尾节点
- int state
- 状态位,在不同的子类中有不同的含义
- long spinForTimeoutThreshold
- 自旋时间,低于这个时间则直接进行空循环,然后重新尝试获取资源
核心方法
int getState()
- 用的很多,但是没啥可说的
void setState(int newState)
- 用的很多,但是没啥可说的
boolean compareAndSetState(int expect, int update)
- Cas更新状态操作,也没啥可说的
Node enq(final Node node)
- 入队操作,如果队列没有节点,则tail为null,这个时候需要加入一个哨兵节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))//加入一个哨兵节点到队列尾部,再次循环
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;//返回原末尾节点
}
}
}
}
Node addWaiter(Node mode)
- 这个作者实现比较有趣,先用快速方式尝试添加节点,成功则返回新添加的节点,失败则通过enq以循环的方式将node入队
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17private Node addWaiter(Node mode) {
//根据当前线程以及模式(共享或者独占)创建一个节点
Node node = new Node(Thread.currentThread(), mode);
//尝试直接添加到队列尾部(所谓的快速添加)
Node pred = tail;
if (pred != null) {
node.prev = pred;
//CAS添加成功则返回结果,失败则只需enq
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//说明快速添加遇到竞争,通过enq进行入队操作
enq(node);
return node;
}
void setHead(Node node)
- 设置头结点,注意一下node的thread和prev会设置为null
void unparkSuccessor(Node node)
- 该方法用于唤醒等待队列中的下一个线程,下一个线程并不一定是当前节点的next节点,需要根据其状态来进行查找,找到之后执行LockSupport.unpark唤醒对应的线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
void doReleaseShared()
- 共享模式的释放操作,一般来说,只需要判断两种情况:
- SIGNAL代表后继节点之前被阻塞了需要释放
- PROPAGATE代表共享模式下可以继续进行acquire
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23private void doReleaseShared() {
for (;;) {
Node h = head;
//这里的判断是处理头结点和尾结点都存在的情况,并且队列里节点总数大于1
if (h != null && h != tail) {
int ws = h.waitStatus;
//Node.SIGNAL表示后继节点需要被唤醒
if (ws == Node.SIGNAL) {
//h从SIGNAL设置为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//执行唤醒操作,这里会将h.waitStatus设置为0,补充,每次只唤醒一个线程
unparkSuccessor(h);
}
//如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去,也就是h从0设置为PROPAGATE,
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
//头节点没有发生变化,可以退出循环,如果头结点发生了变化,为了使自己的唤醒动作可以传递,必须进行重试
if (h == head)
break;
}
}
void setHeadAndPropagate(Node node, int propagate)
- 首先执行setHead方法,在这之后检查条件,如果满足条件则唤醒后继节点(因为是共享模式,所以后继节点也一并唤醒)
1
2
3
4
5
6
7
8
9
10
11
12private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
//检查条件
// propagate > 0 表示调用方指明了后继节点需要被唤醒
// 头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点(看第一行和第二行代码)
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
void cancelAcquire(Node node)
- 取消正在获取资源的操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36private void cancelAcquire(Node node) {
if (node == null)
return;
//首先当前node不在关联任何线程
node.thread = null;
Node pred = node.prev;
//CANCELLED的值为1,该判断也就是跳过已经取消的节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//这里指找到一个有效的前置节点
Node predNext = pred.next;
//将节点node设置为CANCELLED状态
node.waitStatus = Node.CANCELLED;
//判断node是否为tail节点,如果是tail节点,则cas进行替换,替换为找到的有效前置节点pred
if (node == tail && compareAndSetTail(node, pred)) {
执行成则pred的下一个节点为null(已经是tail节点)
compareAndSetNext(pred, predNext, null);
} else {
//执行到这里说明node不是tail节点,或者cas操作失败了
int ws;
// pred如果不是head节点,并且thread不为空,并且满足下面条件之一
// 1. pred.waitStatus为SIGNAL
// 2. pred.waitStatus <= 0 (SIGNAL,CONDITION,PROPAGATE,0),并成功将pred的WaitStatus进行cas替换为SIGNAL
if (pred != head && ( (ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)) ) && pred.thread != null) {
Node next = node.next;
//将前置节点的next指向当前节点的next(说白了就是删除链表中的当前节点,只不过是在cas中进行操作)
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//不满足条件,也就是说node为head的后继节点,直接进行唤醒
unparkSuccessor(node);
}
// 这个就是清除引用,快速gc用的
node.next = node;
}
}
boolean shouldParkAfterFailedAcquire(Node pred, Node node)
- 根据前置节点判断当前节点是否应该被阻塞,同时清理掉CANCELLED节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//如果继的节点状态为SIGNAL,则当前节点需要unpark,返回true
if (ws == Node.SIGNAL)
return true;
//否则返回false,并进行如下操作
//ws > 0说明前置节点已经被取消(CANCELLED = 1), 这时需要继续往前找,直到找到 waitStatus 不为 CANCELLED ,然后返回false。所谓清理CANCELLED节点就是在这里跳过对应的节点。
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果节点状态不是CANCELLED,则cas更新waitStatus为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
void selfInterrupt()
- 这个方法比较简单,就是调用当前线程的中断方法
1
2
3static void selfInterrupt() {
Thread.currentThread().interrupt();
}
boolean parkAndCheckInterrupt()
- 阻塞当前线程并执行中断检查(会清除中断标识)
1
2
3
4private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
boolean acquireQueued(final Node node, int arg)
- 尝试获取锁,成功返回中断状态,失败则则阻塞。阻塞过程中被中断,会返回被中断过标识
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;//默认非中断
for (;;) {
//获取当前节点的前置节点
final Node p = node.predecessor();
//如果前置节点为head节点,则尝试获取资源
//每次只允许当构造节点的前驱节点是头结点才去获取同步状态
if (p == head && tryAcquire(arg)) { //只有一个线程可以通过
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//否则根据是否可以进行park操作进行阻塞
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//如果没有更新failed标志为,则发生异常,取消node节点
if (failed)
cancelAcquire(node);
}
}
void doAcquireInterruptibly(int arg)
- 获取资源操作,如果阻塞过程中被中断,则会抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22private void doAcquireInterruptibly(int arg) throws InterruptedException {
//添加一个独占资源到队列末尾
final Node node = addWaiter(Node.EXCLUSIVE);
//以下代码基本同acquireQueued
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();//这里直接抛出异常
}
} finally {
if (failed)
cancelAcquire(node);
}
}
boolean doAcquireNanos(int arg, long nanosTimeout)
- 带有超时的去获取独占资源,如果被中断,会抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L)//时间判断
return false;
final long deadline = System.nanoTime() + nanosTimeout;//结束时间
final Node node = addWaiter(Node.EXCLUSIVE);//添加独占资源node到队列
boolean failed = true;
try {
for (;;) {
//获取资源
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();//当前还可以等待时间
if (nanosTimeout <= 0L)//已经超时
return false;
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)//阻塞nanosTimeout
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())//线程被中断,则抛出异常
throw new InterruptedException();
}
} finally {
if (failed)//没有成功则取消节点
cancelAcquire(node);
}
}
void doAcquireShared(int arg)
- 以共享的方式获取资源,失败则阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//添加一个共享节点到队列尾部
boolean failed = true;//失败标志位
try {
boolean interrupted = false;//中断标志位
for (;;) {
final Node p = node.predecessor();
if (p == head) {//必须是头节点才可以
int r = tryAcquireShared(arg);//获取资源
//r等于0表示不用唤醒后继节点,大于0需要
if (r >= 0) {
//尝试唤醒后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
//没有中断,则返回
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//获取失败,则进行阻塞,并将前驱节点的状态改成SIGNAL
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
void doAcquireSharedInterruptibly(int arg)
- 基本同doAcquireShared,被中断则抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();//这里抛出异常
}
} finally {
if (failed)
cancelAcquire(node);
}
}
boolean doAcquireSharedNanos(int arg, long nanosTimeout)
- 和上面的没啥区别,就是多了超时控制而已,被中断也是抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
boolean tryAcquire(int arg)
- AQS没有提供具体实现,需要子类实现
1
2
3protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
boolean tryRelease(int arg)
- AQS没有提供具体实现,需要子类实现
1
2
3protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
int tryAcquireShared(int arg)
- AQS没有提供具体实现,需要子类实现
1
2
3protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
boolean tryReleaseShared(int arg)
- AQS没有提供具体实现,需要子类实现
1
2
3protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
boolean isHeldExclusively()
- AQS没有提供具体实现,需要子类实现
1
2
3protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
void acquire(int arg)
- 以独占的方式去获取资源,忽略中断。
1
2
3
4
5public final void acquire(int arg) {
//至少执行一次tryAcquire,成功则返回,失败则进行线程阻塞状态,等待唤醒重新获取资源
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
void acquireInterruptibly(int arg)
- 以独占的方式去获取资源,等待期间会被中断。如果线程本身已经被中断,调用该方法会立即抛出异常
1
2
3
4
5
6public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
boolean tryAcquireNanos(int arg, long nanosTimeout)
- 带有超时的获取独占资源,也会抛出中断异常
1
2
3
4
5
6public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
boolean release(int arg)
- 资源释放
1
2
3
4
5
6
7
8
9
10public final boolean release(int arg) {
//保证原子方式释放资源,同一时刻只能有一个线程成功
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒当前节点的后继节点所包含的线程
return true;
}
return false;
}
void acquireShared(int arg)
- 以共享模式获取状态
1
2
3
4
5
6public final void acquireShared(int arg) {
//尝试获取共享状态
if (tryAcquireShared(arg) < 0)
//获取失败进入sync队列
doAcquireShared(arg);
}
void acquireSharedInterruptibly(int arg)
- 相比acquireShared,只是增加了可中断
1
2
3
4
5
6public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
- 在acquireSharedInterruptibly的基础上增加了超时
1
2
3
4
5
6
7public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
boolean releaseShared(int arg)
- 释放共享资源
1
2
3
4
5
6
7
8
9public final boolean releaseShared(int arg) {
//尝试释放共享资源
if (tryReleaseShared(arg)) {
//唤醒的过程,上文已经分析
doReleaseShared();
return true;
}
return false;
}
boolean hasQueuedThreads()
- 队列中是否有线程在等待获取资源
1
2
3public final boolean hasQueuedThreads() {
return head != tail;
}
boolean hasContended()
- 是否其他线程也竞争获取资源(因为head是公用的)
1
2
3public final boolean hasContended() {
return head != null;
}
Thread getFirstQueuedThread()
- 返回队列中的第一个线程,如果快速路径失败(head == tail),则调用fullGetFirstQueuedThread查找
1
2
3
4public final Thread getFirstQueuedThread() {
// handle only fast path, else relay
return (head == tail) ? null : fullGetFirstQueuedThread();
}
Thread fullGetFirstQueuedThread()
- 返回队列中第一个(等待时间最长的)线程,如果目前没有将任何线程加入队列,则返回 null.
- 在此实现中,该操作是以固定时间返回的,但是,如果其他线程目前正在并发修改该队列,则可能出现循环争用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private Thread fullGetFirstQueuedThread() {
Node h, s;
Thread st;
if (((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null) ||
((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null))
return st;
Node t = tail;
Thread firstThread = null;
while (t != null && t != head) {
Thread tt = t.thread;
if (tt != null)
firstThread = tt;
t = t.prev;
}
return firstThread;
}
boolean isQueued(Thread thread)
- 判断thread是否在队列中等待获取资源
1
2
3
4
5
6
7
8public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)
if (p.thread == thread)
return true;
return false;
}
boolean apparentlyFirstQueuedIsExclusive()
- 在head不为null,head的next不为null,head的next不是共享的,head的thread不为空的条件下返回true,否则返回false
- 作用就是读锁不应该让写锁始终等待。
1
2
3
4
5
6
7final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
boolean hasQueuedPredecessors()
- 判断当前线程是不是在CLH队列的队首,来返回AQS中是不是有比当前线程等待更久的线程。
1
2
3
4
5
6
7public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
int getQueueLength()
- 获取队列长度
1
2
3
4
5
6
7
8public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
if (p.thread != null)
++n;
}
return n;
}
Collection getQueuedThreads()
- 获取线程队列
1
2
3
4
5
6
7
8
9public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
Collection getExclusiveQueuedThreads()
- 获取独占资源的线程队列
1
2
3
4
5
6
7
8
9
10
11public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (!p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
Collection getSharedQueuedThreads()
- 获取共享资源的线程队列
1
2
3
4
5
6
7
8
9
10
11public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
boolean isOnSyncQueue(Node node)
- 判断该节点是否在CLH队列中
1
2
3
4
5
6
7
8
9
10final boolean isOnSyncQueue(Node node) {
//如果该节点的状态为CONDITION(该状态只能在CONDITION队列中出现,CLH队列中不会出现CONDITION状态),或者该节点的prev指针为null,则该节点一定不在CLH队列中
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//如果该节点的next(不是nextWaiter,next指针在CLH队列中指向下一个节点)状态不为null,则该节点一定在CLH队列中
if (node.next != null) // If has successor, it must be on queue
return true;
//遍历CLH队列(从尾节点开始遍历)查找该节点
return findNodeFromTail(node);
}
boolean findNodeFromTail(Node node)
- 从tail往前寻找节点
1
2
3
4
5
6
7
8
9
10private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
boolean transferForSignal(Node node)
- 将节点添加到CLH队列中
1
2
3
4
5
6
7
8
9
10
11
12
13final boolean transferForSignal(Node node) {
//如果CAS失败,则当前节点的状态为CANCELLED
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//首先enq将该node添加到CLH队列中
Node p = enq(node);
int ws = p.waitStatus;
//如果p是一个取消(ws > 0)了的节点,或者对p进行CAS设置失败,则唤醒node节点,让node所在线程进入到acquireQueue方法中,重新进行相关操作
//否则,由于该节点的前驱节点已经是signal状态了,不用在此处唤醒await中的线程,唤醒工作留给CLH队列中前驱节点
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);//唤醒
return true;
}
boolean transferAfterCancelledWait(Node node)
- 将当前Node强制transfer到CLH队列中
1
2
3
4
5
6
7
8
9
10
11final boolean transferAfterCancelledWait(Node node) {
//将该节点状态由CONDITION变成0,调用enq将该节点从CONDITION队列添加到CLH队列中(但是在CONDITION队列中的nextWaiter连接并没有取消)
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
//循环检测该node是否已经成功添加到CLH队列中
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
int fullyRelease(Node node)
- 完全释放锁,释放成功则返回,失败则将当前节点的状态设置成cancelled表示当前节点失效
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;//失败则当前node状态为CANCELLED
}
}
boolean owns(ConditionObject condition)
- 判断条件对象拥有者
1
2
3public final boolean owns(ConditionObject condition) {
return condition.isOwnedBy(this);
}
boolean hasWaiters(ConditionObject condition)
- 条件队列是否有等待者
1
2
3
4
5public final boolean hasWaiters(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.hasWaiters();
}
int getWaitQueueLength(ConditionObject condition)
- 获取条件队列等待者数量
1
2
3
4
5public final int getWaitQueueLength(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitQueueLength();
}
Collection getWaitingThreads(ConditionObject condition)
- 获取条件队列等待者线程
1
2
3
4
5public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitingThreads();
}
辅助Field及方法
就不一一解释了
- Cas相关Field
- Unsafe unsafe;
- long stateOffset;
- long headOffset;
- long tailOffset;
- long waitStatusOffset;
- long nextOffset;
- Cas相关Method
- boolean compareAndSetHead(Node update)
- boolean compareAndSetTail(Node expect, Node update)
- boolean compareAndSetWaitStatus(Node node, int expect, int update)
- boolean compareAndSetNext(Node node, Node expect, Node update)