0%

Java线程池分析-AbstractQueuedSynchronizer

AQS分析第一篇,整理得快吐血了,不过话说回来,看一遍和整理了一遍真的完全不一样,收获还是很多的。
这里基本把AQS整个类分析了一遍,剩下还有就是条件对象以及AQS应用了,后续有时间在整理了。


AbstractOwnableSynchronizer

  • 比较简单,内部一个exclusiveOwnerThread,附带get和set方法,不多说

Node

  • AbstractQueuedSynchronizer 核心依赖,内部队列就是由Node组成

核心Field

状态类

  • int CANCELLED = 1;
    • 代表被取消了
  • int SIGNAL = -1;
    • 代表需要被唤醒
  • int CONDITION = -2;
    • 代表在条件队列中等待
  • int PROPAGATE = -3;
    • 代表释放资源的时候需要通知其他Node
  • int waitStatus
    • 代表当前Node的等待状态,取值为CANCELLED、SIGNAL、CONDITION、PROPAGATE,默认初始化为0

记录阻塞模式

  • Node SHARED
    • 代表该Node是因为获取共享资源被阻塞而放入AQS
  • Node EXCLUSIVE
    • 代表该Node是因为获取独占资源被阻塞而放入AQS

链表相关

  • 提供前驱和后继节点的访问方法,也就是说链表是双向的
  • Node prev
    • 记录当前节点的前驱节点
  • Node next
    • 记录当前节点的后继节点

其他

  • Thread thread
    • thread用于存放进入AQS队列的里面的线程
  • Node nextWaiter
    • 在Node作为同步队列节点时,nextWaiter可能有两个值:EXCLUSIVE、SHARED标识当前节点是独占模式还是共享模式;
    • 在Node作为等待队列节点使用时,nextWaiter保存后继节点。

核心方法

boolean isShared()

  • 当前节点获取资源采用的是否为共享方式
    1
    2
    3
    final boolean isShared() {
    return nextWaiter == SHARED;
    }

Node predecessor()

  • 获取前置节点,如果前置节点为null,则抛出NPE异常
    1
    2
    3
    4
    5
    6
    7
    final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
    throw new NullPointerException();
    else
    return p;
    }

AbstractQueuedSynchronizer

核心Field

  • Node head
    • 内部队列的头结点
  • Node tail
    • 内部队列的尾节点
  • int state
    • 状态位,在不同的子类中有不同的含义
  • long spinForTimeoutThreshold
    • 自旋时间,低于这个时间则直接进行空循环,然后重新尝试获取资源

核心方法

int getState()

  • 用的很多,但是没啥可说的

void setState(int newState)

  • 用的很多,但是没啥可说的

boolean compareAndSetState(int expect, int update)

  • Cas更新状态操作,也没啥可说的

Node enq(final Node node)

  • 入队操作,如果队列没有节点,则tail为null,这个时候需要加入一个哨兵节点
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private Node enq(final Node node) {
    for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
    if (compareAndSetHead(new Node()))//加入一个哨兵节点到队列尾部,再次循环
    tail = head;
    } else {
    node.prev = t;
    if (compareAndSetTail(t, node)) {
    t.next = node;
    return t;//返回原末尾节点
    }
    }
    }
    }

Node addWaiter(Node mode)

  • 这个作者实现比较有趣,先用快速方式尝试添加节点,成功则返回新添加的节点,失败则通过enq以循环的方式将node入队
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    private Node addWaiter(Node mode) {
    //根据当前线程以及模式(共享或者独占)创建一个节点
    Node node = new Node(Thread.currentThread(), mode);
    //尝试直接添加到队列尾部(所谓的快速添加)
    Node pred = tail;
    if (pred != null) {
    node.prev = pred;
    //CAS添加成功则返回结果,失败则只需enq
    if (compareAndSetTail(pred, node)) {
    pred.next = node;
    return node;
    }
    }
    //说明快速添加遇到竞争,通过enq进行入队操作
    enq(node);
    return node;
    }

void setHead(Node node)

  • 设置头结点,注意一下node的thread和prev会设置为null

void unparkSuccessor(Node node)

  • 该方法用于唤醒等待队列中的下一个线程,下一个线程并不一定是当前节点的next节点,需要根据其状态来进行查找,找到之后执行LockSupport.unpark唤醒对应的线程。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
    if (t.waitStatus <= 0)
    s = t;
    }
    if (s != null)
    LockSupport.unpark(s.thread);
    }

void doReleaseShared()

  • 共享模式的释放操作,一般来说,只需要判断两种情况:
    • SIGNAL代表后继节点之前被阻塞了需要释放
    • PROPAGATE代表共享模式下可以继续进行acquire
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      private void doReleaseShared() {
      for (;;) {
      Node h = head;
      //这里的判断是处理头结点和尾结点都存在的情况,并且队列里节点总数大于1
      if (h != null && h != tail) {
      int ws = h.waitStatus;
      //Node.SIGNAL表示后继节点需要被唤醒
      if (ws == Node.SIGNAL) {
      //h从SIGNAL设置为0
      if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
      continue;
      //执行唤醒操作,这里会将h.waitStatus设置为0,补充,每次只唤醒一个线程
      unparkSuccessor(h);
      }
      //如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去,也就是h从0设置为PROPAGATE,
      else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
      continue;
      }
      //头节点没有发生变化,可以退出循环,如果头结点发生了变化,为了使自己的唤醒动作可以传递,必须进行重试
      if (h == head)
      break;
      }
      }

void setHeadAndPropagate(Node node, int propagate)

  • 首先执行setHead方法,在这之后检查条件,如果满足条件则唤醒后继节点(因为是共享模式,所以后继节点也一并唤醒)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    //检查条件
    // propagate > 0 表示调用方指明了后继节点需要被唤醒
    // 头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点(看第一行和第二行代码)
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
    doReleaseShared();
    }
    }

void cancelAcquire(Node node)

  • 取消正在获取资源的操作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    private void cancelAcquire(Node node) {
    if (node == null)
    return;
    //首先当前node不在关联任何线程
    node.thread = null;
    Node pred = node.prev;
    //CANCELLED的值为1,该判断也就是跳过已经取消的节点
    while (pred.waitStatus > 0)
    node.prev = pred = pred.prev;
    //这里指找到一个有效的前置节点
    Node predNext = pred.next;
    //将节点node设置为CANCELLED状态
    node.waitStatus = Node.CANCELLED;
    //判断node是否为tail节点,如果是tail节点,则cas进行替换,替换为找到的有效前置节点pred
    if (node == tail && compareAndSetTail(node, pred)) {
    执行成则pred的下一个节点为null(已经是tail节点)
    compareAndSetNext(pred, predNext, null);
    } else {
    //执行到这里说明node不是tail节点,或者cas操作失败了
    int ws;
    // pred如果不是head节点,并且thread不为空,并且满足下面条件之一
    // 1. pred.waitStatus为SIGNAL
    // 2. pred.waitStatus <= 0 (SIGNAL,CONDITION,PROPAGATE,0),并成功将pred的WaitStatus进行cas替换为SIGNAL
    if (pred != head && ( (ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)) ) && pred.thread != null) {
    Node next = node.next;
    //将前置节点的next指向当前节点的next(说白了就是删除链表中的当前节点,只不过是在cas中进行操作)
    if (next != null && next.waitStatus <= 0)
    compareAndSetNext(pred, predNext, next);
    } else {
    //不满足条件,也就是说node为head的后继节点,直接进行唤醒
    unparkSuccessor(node);
    }
    // 这个就是清除引用,快速gc用的
    node.next = node;
    }
    }

boolean shouldParkAfterFailedAcquire(Node pred, Node node)

  • 根据前置节点判断当前节点是否应该被阻塞,同时清理掉CANCELLED节点
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    //如果继的节点状态为SIGNAL,则当前节点需要unpark,返回true
    if (ws == Node.SIGNAL)
    return true;
    //否则返回false,并进行如下操作
    //ws > 0说明前置节点已经被取消(CANCELLED = 1), 这时需要继续往前找,直到找到 waitStatus 不为 CANCELLED ,然后返回false。所谓清理CANCELLED节点就是在这里跳过对应的节点。
    if (ws > 0) {
    do {
    node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
    } else {
    //如果节点状态不是CANCELLED,则cas更新waitStatus为SIGNAL
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
    }

void selfInterrupt()

  • 这个方法比较简单,就是调用当前线程的中断方法
    1
    2
    3
    static void selfInterrupt() {
    Thread.currentThread().interrupt();
    }

boolean parkAndCheckInterrupt()

  • 阻塞当前线程并执行中断检查(会清除中断标识)
    1
    2
    3
    4
    private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
    }

boolean acquireQueued(final Node node, int arg)

  • 尝试获取锁,成功返回中断状态,失败则则阻塞。阻塞过程中被中断,会返回被中断过标识
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false;//默认非中断
    for (;;) {
    //获取当前节点的前置节点
    final Node p = node.predecessor();
    //如果前置节点为head节点,则尝试获取资源
    //每次只允许当构造节点的前驱节点是头结点才去获取同步状态
    if (p == head && tryAcquire(arg)) { //只有一个线程可以通过
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
    }
    //否则根据是否可以进行park操作进行阻塞
    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    //如果没有更新failed标志为,则发生异常,取消node节点
    if (failed)
    cancelAcquire(node);
    }
    }

void doAcquireInterruptibly(int arg)

  • 获取资源操作,如果阻塞过程中被中断,则会抛出异常
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    private void doAcquireInterruptibly(int arg) throws InterruptedException {
    //添加一个独占资源到队列末尾
    final Node node = addWaiter(Node.EXCLUSIVE);
    //以下代码基本同acquireQueued
    boolean failed = true;
    try {
    for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return;
    }
    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    throw new InterruptedException();//这里直接抛出异常
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

boolean doAcquireNanos(int arg, long nanosTimeout)

  • 带有超时的去获取独占资源,如果被中断,会抛出异常
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)//时间判断
    return false;
    final long deadline = System.nanoTime() + nanosTimeout;//结束时间
    final Node node = addWaiter(Node.EXCLUSIVE);//添加独占资源node到队列
    boolean failed = true;
    try {
    for (;;) {
    //获取资源
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return true;
    }
    nanosTimeout = deadline - System.nanoTime();//当前还可以等待时间
    if (nanosTimeout <= 0L)//已经超时
    return false;
    if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)//阻塞nanosTimeout
    LockSupport.parkNanos(this, nanosTimeout);
    if (Thread.interrupted())//线程被中断,则抛出异常
    throw new InterruptedException();
    }
    } finally {
    if (failed)//没有成功则取消节点
    cancelAcquire(node);
    }
    }

void doAcquireShared(int arg)

  • 以共享的方式获取资源,失败则阻塞
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);//添加一个共享节点到队列尾部
    boolean failed = true;//失败标志位
    try {
    boolean interrupted = false;//中断标志位
    for (;;) {
    final Node p = node.predecessor();
    if (p == head) {//必须是头节点才可以
    int r = tryAcquireShared(arg);//获取资源
    //r等于0表示不用唤醒后继节点,大于0需要
    if (r >= 0) {
    //尝试唤醒后继节点
    setHeadAndPropagate(node, r);
    p.next = null; // help GC
    //没有中断,则返回
    if (interrupted)
    selfInterrupt();
    failed = false;
    return;
    }
    }
    //获取失败,则进行阻塞,并将前驱节点的状态改成SIGNAL
    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

void doAcquireSharedInterruptibly(int arg)

  • 基本同doAcquireShared,被中断则抛出异常
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
    for (;;) {
    final Node p = node.predecessor();
    if (p == head) {
    int r = tryAcquireShared(arg);
    if (r >= 0) {
    setHeadAndPropagate(node, r);
    p.next = null; // help GC
    failed = false;
    return;
    }
    }
    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    throw new InterruptedException();//这里抛出异常
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

boolean doAcquireSharedNanos(int arg, long nanosTimeout)

  • 和上面的没啥区别,就是多了超时控制而已,被中断也是抛出异常
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
    return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
    for (;;) {
    final Node p = node.predecessor();
    if (p == head) {
    int r = tryAcquireShared(arg);
    if (r >= 0) {
    setHeadAndPropagate(node, r);
    p.next = null; // help GC
    failed = false;
    return true;
    }
    }
    nanosTimeout = deadline - System.nanoTime();
    if (nanosTimeout <= 0L)
    return false;
    if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
    LockSupport.parkNanos(this, nanosTimeout);
    if (Thread.interrupted())
    throw new InterruptedException();
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

boolean tryAcquire(int arg)

  • AQS没有提供具体实现,需要子类实现
    1
    2
    3
    protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
    }

boolean tryRelease(int arg)

  • AQS没有提供具体实现,需要子类实现
    1
    2
    3
    protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
    }

int tryAcquireShared(int arg)

  • AQS没有提供具体实现,需要子类实现
    1
    2
    3
    protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
    }

boolean tryReleaseShared(int arg)

  • AQS没有提供具体实现,需要子类实现
    1
    2
    3
    protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
    }

boolean isHeldExclusively()

  • AQS没有提供具体实现,需要子类实现
    1
    2
    3
    protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
    }

void acquire(int arg)

  • 以独占的方式去获取资源,忽略中断。
    1
    2
    3
    4
    5
    public final void acquire(int arg) {
    //至少执行一次tryAcquire,成功则返回,失败则进行线程阻塞状态,等待唤醒重新获取资源
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }

void acquireInterruptibly(int arg)

  • 以独占的方式去获取资源,等待期间会被中断。如果线程本身已经被中断,调用该方法会立即抛出异常
    1
    2
    3
    4
    5
    6
    public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    if (!tryAcquire(arg))
    doAcquireInterruptibly(arg);
    }

boolean tryAcquireNanos(int arg, long nanosTimeout)

  • 带有超时的获取独占资源,也会抛出中断异常
    1
    2
    3
    4
    5
    6
    public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    return tryAcquire(arg) ||
    doAcquireNanos(arg, nanosTimeout);
    }

boolean release(int arg)

  • 资源释放
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public final boolean release(int arg) {
    //保证原子方式释放资源,同一时刻只能有一个线程成功
    if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
    unparkSuccessor(h);//唤醒当前节点的后继节点所包含的线程
    return true;
    }
    return false;
    }

void acquireShared(int arg)

  • 以共享模式获取状态
    1
    2
    3
    4
    5
    6
    public final void acquireShared(int arg) {
    //尝试获取共享状态
    if (tryAcquireShared(arg) < 0)
    //获取失败进入sync队列
    doAcquireShared(arg);
    }

void acquireSharedInterruptibly(int arg)

  • 相比acquireShared,只是增加了可中断
    1
    2
    3
    4
    5
    6
    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
    doAcquireSharedInterruptibly(arg);
    }

boolean tryAcquireSharedNanos(int arg, long nanosTimeout)

  • 在acquireSharedInterruptibly的基础上增加了超时
    1
    2
    3
    4
    5
    6
    7
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
    throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
    doAcquireSharedNanos(arg, nanosTimeout);
    }

boolean releaseShared(int arg)

  • 释放共享资源
    1
    2
    3
    4
    5
    6
    7
    8
    9
    public final boolean releaseShared(int arg) {
    //尝试释放共享资源
    if (tryReleaseShared(arg)) {
    //唤醒的过程,上文已经分析
    doReleaseShared();
    return true;
    }
    return false;
    }

boolean hasQueuedThreads()

  • 队列中是否有线程在等待获取资源
    1
    2
    3
    public final boolean hasQueuedThreads() {
    return head != tail;
    }

boolean hasContended()

  • 是否其他线程也竞争获取资源(因为head是公用的)
    1
    2
    3
    public final boolean hasContended() {
    return head != null;
    }

Thread getFirstQueuedThread()

  • 返回队列中的第一个线程,如果快速路径失败(head == tail),则调用fullGetFirstQueuedThread查找
    1
    2
    3
    4
    public final Thread getFirstQueuedThread() {
    // handle only fast path, else relay
    return (head == tail) ? null : fullGetFirstQueuedThread();
    }

Thread fullGetFirstQueuedThread()

  • 返回队列中第一个(等待时间最长的)线程,如果目前没有将任何线程加入队列,则返回 null.
  • 在此实现中,该操作是以固定时间返回的,但是,如果其他线程目前正在并发修改该队列,则可能出现循环争用。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    private Thread fullGetFirstQueuedThread() {
    Node h, s;
    Thread st;
    if (((h = head) != null && (s = h.next) != null &&
    s.prev == head && (st = s.thread) != null) ||
    ((h = head) != null && (s = h.next) != null &&
    s.prev == head && (st = s.thread) != null))
    return st;
    Node t = tail;
    Thread firstThread = null;
    while (t != null && t != head) {
    Thread tt = t.thread;
    if (tt != null)
    firstThread = tt;
    t = t.prev;
    }
    return firstThread;
    }

boolean isQueued(Thread thread)

  • 判断thread是否在队列中等待获取资源
    1
    2
    3
    4
    5
    6
    7
    8
    public final boolean isQueued(Thread thread) {
    if (thread == null)
    throw new NullPointerException();
    for (Node p = tail; p != null; p = p.prev)
    if (p.thread == thread)
    return true;
    return false;
    }

boolean apparentlyFirstQueuedIsExclusive()

  • 在head不为null,head的next不为null,head的next不是共享的,head的thread不为空的条件下返回true,否则返回false
  • 作用就是读锁不应该让写锁始终等待。
    1
    2
    3
    4
    5
    6
    7
    final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
    (s = h.next) != null &&
    !s.isShared() &&
    s.thread != null;
    }

boolean hasQueuedPredecessors()

  • 判断当前线程是不是在CLH队列的队首,来返回AQS中是不是有比当前线程等待更久的线程。
    1
    2
    3
    4
    5
    6
    7
    public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
    ((s = h.next) == null || s.thread != Thread.currentThread());
    }

int getQueueLength()

  • 获取队列长度
    1
    2
    3
    4
    5
    6
    7
    8
    public final int getQueueLength() {
    int n = 0;
    for (Node p = tail; p != null; p = p.prev) {
    if (p.thread != null)
    ++n;
    }
    return n;
    }

Collection getQueuedThreads()

  • 获取线程队列
    1
    2
    3
    4
    5
    6
    7
    8
    9
    public final Collection<Thread> getQueuedThreads() {
    ArrayList<Thread> list = new ArrayList<Thread>();
    for (Node p = tail; p != null; p = p.prev) {
    Thread t = p.thread;
    if (t != null)
    list.add(t);
    }
    return list;
    }

Collection getExclusiveQueuedThreads()

  • 获取独占资源的线程队列
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public final Collection<Thread> getExclusiveQueuedThreads() {
    ArrayList<Thread> list = new ArrayList<Thread>();
    for (Node p = tail; p != null; p = p.prev) {
    if (!p.isShared()) {
    Thread t = p.thread;
    if (t != null)
    list.add(t);
    }
    }
    return list;
    }

Collection getSharedQueuedThreads()

  • 获取共享资源的线程队列
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public final Collection<Thread> getSharedQueuedThreads() {
    ArrayList<Thread> list = new ArrayList<Thread>();
    for (Node p = tail; p != null; p = p.prev) {
    if (p.isShared()) {
    Thread t = p.thread;
    if (t != null)
    list.add(t);
    }
    }
    return list;
    }

boolean isOnSyncQueue(Node node)

  • 判断该节点是否在CLH队列中
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    final boolean isOnSyncQueue(Node node) {
    //如果该节点的状态为CONDITION(该状态只能在CONDITION队列中出现,CLH队列中不会出现CONDITION状态),或者该节点的prev指针为null,则该节点一定不在CLH队列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
    return false;
    //如果该节点的next(不是nextWaiter,next指针在CLH队列中指向下一个节点)状态不为null,则该节点一定在CLH队列中
    if (node.next != null) // If has successor, it must be on queue
    return true;
    //遍历CLH队列(从尾节点开始遍历)查找该节点
    return findNodeFromTail(node);
    }

boolean findNodeFromTail(Node node)

  • 从tail往前寻找节点
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
    if (t == node)
    return true;
    if (t == null)
    return false;
    t = t.prev;
    }
    }

boolean transferForSignal(Node node)

  • 将节点添加到CLH队列中
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
      final boolean transferForSignal(Node node) {
    //如果CAS失败,则当前节点的状态为CANCELLED
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;
    //首先enq将该node添加到CLH队列中
    Node p = enq(node);
    int ws = p.waitStatus;
    //如果p是一个取消(ws > 0)了的节点,或者对p进行CAS设置失败,则唤醒node节点,让node所在线程进入到acquireQueue方法中,重新进行相关操作
    //否则,由于该节点的前驱节点已经是signal状态了,不用在此处唤醒await中的线程,唤醒工作留给CLH队列中前驱节点
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    LockSupport.unpark(node.thread);//唤醒
    return true;
    }

boolean transferAfterCancelledWait(Node node)

  • 将当前Node强制transfer到CLH队列中
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    final boolean transferAfterCancelledWait(Node node) {
    //将该节点状态由CONDITION变成0,调用enq将该节点从CONDITION队列添加到CLH队列中(但是在CONDITION队列中的nextWaiter连接并没有取消)
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
    enq(node);
    return true;
    }
    //循环检测该node是否已经成功添加到CLH队列中
    while (!isOnSyncQueue(node))
    Thread.yield();
    return false;
    }

int fullyRelease(Node node)

  • 完全释放锁,释放成功则返回,失败则将当前节点的状态设置成cancelled表示当前节点失效
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    final int fullyRelease(Node node) {
    boolean failed = true;
    try {
    int savedState = getState();
    if (release(savedState)) {
    failed = false;
    return savedState;
    } else {
    throw new IllegalMonitorStateException();
    }
    } finally {
    if (failed)
    node.waitStatus = Node.CANCELLED;//失败则当前node状态为CANCELLED
    }
    }

boolean owns(ConditionObject condition)

  • 判断条件对象拥有者
    1
    2
    3
    public final boolean owns(ConditionObject condition) {
    return condition.isOwnedBy(this);
    }

boolean hasWaiters(ConditionObject condition)

  • 条件队列是否有等待者
    1
    2
    3
    4
    5
    public final boolean hasWaiters(ConditionObject condition) {
    if (!owns(condition))
    throw new IllegalArgumentException("Not owner");
    return condition.hasWaiters();
    }

int getWaitQueueLength(ConditionObject condition)

  • 获取条件队列等待者数量
    1
    2
    3
    4
    5
    public final int getWaitQueueLength(ConditionObject condition) {
    if (!owns(condition))
    throw new IllegalArgumentException("Not owner");
    return condition.getWaitQueueLength();
    }

Collection getWaitingThreads(ConditionObject condition)

  • 获取条件队列等待者线程
    1
    2
    3
    4
    5
    public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
    if (!owns(condition))
    throw new IllegalArgumentException("Not owner");
    return condition.getWaitingThreads();
    }

辅助Field及方法

就不一一解释了

  • Cas相关Field
    • Unsafe unsafe;
    • long stateOffset;
    • long headOffset;
    • long tailOffset;
    • long waitStatusOffset;
    • long nextOffset;
  • Cas相关Method
    • boolean compareAndSetHead(Node update)
    • boolean compareAndSetTail(Node expect, Node update)
    • boolean compareAndSetWaitStatus(Node node, int expect, int update)
    • boolean compareAndSetNext(Node node, Node expect, Node update)

参考