public boolean hasWaiters(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition); }
getWaitQueueLength(Condition condition)
阻塞在condition的await()的方法上的线程数量
1 2 3 4 5 6 7
public int getWaitQueueLength(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition); }
getWaitingThreads(Condition condition)
阻塞在condition的await()的方法上的线程集合
1 2 3 4 5 6 7
protected Collection<Thread> getWaitingThreads(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread();//当前线程 int c = getState();//获取加锁状态 if (c == 0) {//为0则代表还没有上锁 if (compareAndSetState(0, acquires)) {//执行CAS操作并且上锁 setExclusiveOwnerThread(current);//设置持有锁的线程 return true;//加锁成功 } } else if (current == getExclusiveOwnerThread()) {//执行到这里说明已经有某个线程获取到锁了,因为是可重入锁,判断持有锁的线程是否为当前线程 int nextc = c + acquires;//执行到这里说明是已经不是第一次上锁,并且当前线程是锁的持有线程,则可以直接进行累加(也就是重入) if (nextc < 0) // 额,超过int的最大值,出现溢出了(真的存在这种场景么= =??) throw new Error("Maximum lock count exceeded"); setState(nextc);//更新state return true; } return false;//获取失败 }
boolean tryRelease(int releases)
非阻塞方式尝试释放资源,具体看源码分析
1 2 3 4 5 6 7 8 9 10 11 12
protected final boolean tryRelease(int releases) { int c = getState() - releases;//待更新资源 if (Thread.currentThread() != getExclusiveOwnerThread())//判断是否为锁的持有现成 throw new IllegalMonitorStateException(); boolean free = false;//释放标识位置。为true则代表当前线程不再持有当前锁的任何资源 if (c == 0) {//如果释放资源后,资源数量为0,代表释放锁,其他线程可以尝试获取锁,如果不为0,则需要继续释放(因为是重入多次,需要释放多次) free = true; setExclusiveOwnerThread(null);//清空锁持有线程 } setState(c);//更新状态标志位 return free; }
boolean isHeldExclusively()
判断当前线程是否为锁持有线程
1 2 3
protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); }
ConditionObject newCondition()
创建条件变量对象,ConditionObject之后分析
1 2 3
final ConditionObject newCondition() { return new ConditionObject(); }
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
boolean tryAcquire(int arg)
AQS没有提供具体实现,需要子类实现
1 2 3
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
boolean tryRelease(int arg)
AQS没有提供具体实现,需要子类实现
1 2 3
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
int tryAcquireShared(int arg)
AQS没有提供具体实现,需要子类实现
1 2 3
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
boolean tryReleaseShared(int arg)
AQS没有提供具体实现,需要子类实现
1 2 3
protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
boolean isHeldExclusively()
AQS没有提供具体实现,需要子类实现
1 2 3
protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
void acquire(int arg)
以独占的方式去获取资源,忽略中断。
1 2 3 4 5
public final void acquire(int arg) { //至少执行一次tryAcquire,成功则返回,失败则进行线程阻塞状态,等待唤醒重新获取资源 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
void acquireInterruptibly(int arg)
以独占的方式去获取资源,等待期间会被中断。如果线程本身已经被中断,调用该方法会立即抛出异常
1 2 3 4 5 6
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
boolean tryAcquireNanos(int arg, long nanosTimeout)
带有超时的获取独占资源,也会抛出中断异常
1 2 3 4 5 6
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
boolean release(int arg)
资源释放
1 2 3 4 5 6 7 8 9 10
public final boolean release(int arg) { //保证原子方式释放资源,同一时刻只能有一个线程成功 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);//唤醒当前节点的后继节点所包含的线程 return true; } return false; }
void acquireShared(int arg)
以共享模式获取状态
1 2 3 4 5 6
public final void acquireShared(int arg) { //尝试获取共享状态 if (tryAcquireShared(arg) < 0) //获取失败进入sync队列 doAcquireShared(arg); }
void acquireSharedInterruptibly(int arg)
相比acquireShared,只是增加了可中断
1 2 3 4 5 6
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
在acquireSharedInterruptibly的基础上增加了超时
1 2 3 4 5 6 7
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
boolean releaseShared(int arg)
释放共享资源
1 2 3 4 5 6 7 8 9
public final boolean releaseShared(int arg) { //尝试释放共享资源 if (tryReleaseShared(arg)) { //唤醒的过程,上文已经分析 doReleaseShared(); return true; } return false; }
boolean hasQueuedThreads()
队列中是否有线程在等待获取资源
1 2 3
public final boolean hasQueuedThreads() { return head != tail; }
boolean hasContended()
是否其他线程也竞争获取资源(因为head是公用的)
1 2 3
public final boolean hasContended() { return head != null; }
private Thread fullGetFirstQueuedThread() { Node h, s; Thread st; if (((h = head) != null && (s = h.next) != null && s.prev == head && (st = s.thread) != null) || ((h = head) != null && (s = h.next) != null && s.prev == head && (st = s.thread) != null)) return st; Node t = tail; Thread firstThread = null; while (t != null && t != head) { Thread tt = t.thread; if (tt != null) firstThread = tt; t = t.prev; } return firstThread; }
boolean isQueued(Thread thread)
判断thread是否在队列中等待获取资源
1 2 3 4 5 6 7 8
public final boolean isQueued(Thread thread) { if (thread == null) throw new NullPointerException(); for (Node p = tail; p != null; p = p.prev) if (p.thread == thread) return true; return false; }
public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
int getQueueLength()
获取队列长度
1 2 3 4 5 6 7 8
public final int getQueueLength() { int n = 0; for (Node p = tail; p != null; p = p.prev) { if (p.thread != null) ++n; } return n; }
Collection getQueuedThreads()
获取线程队列
1 2 3 4 5 6 7 8 9
public final Collection<Thread> getQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { Thread t = p.thread; if (t != null) list.add(t); } return list; }
Collection getExclusiveQueuedThreads()
获取独占资源的线程队列
1 2 3 4 5 6 7 8 9 10 11
public final Collection<Thread> getExclusiveQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { if (!p.isShared()) { Thread t = p.thread; if (t != null) list.add(t); } } return list; }
Collection getSharedQueuedThreads()
获取共享资源的线程队列
1 2 3 4 5 6 7 8 9 10 11
public final Collection<Thread> getSharedQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { if (p.isShared()) { Thread t = p.thread; if (t != null) list.add(t); } } return list; }
boolean isOnSyncQueue(Node node)
判断该节点是否在CLH队列中
1 2 3 4 5 6 7 8 9 10
final boolean isOnSyncQueue(Node node) { //如果该节点的状态为CONDITION(该状态只能在CONDITION队列中出现,CLH队列中不会出现CONDITION状态),或者该节点的prev指针为null,则该节点一定不在CLH队列中 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //如果该节点的next(不是nextWaiter,next指针在CLH队列中指向下一个节点)状态不为null,则该节点一定在CLH队列中 if (node.next != null) // If has successor, it must be on queue return true; //遍历CLH队列(从尾节点开始遍历)查找该节点 return findNodeFromTail(node); }
boolean findNodeFromTail(Node node)
从tail往前寻找节点
1 2 3 4 5 6 7 8 9 10
private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
boolean transferForSignal(Node node)
将节点添加到CLH队列中
1 2 3 4 5 6 7 8 9 10 11 12 13
final boolean transferForSignal(Node node) { //如果CAS失败,则当前节点的状态为CANCELLED if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //首先enq将该node添加到CLH队列中 Node p = enq(node); int ws = p.waitStatus; //如果p是一个取消(ws > 0)了的节点,或者对p进行CAS设置失败,则唤醒node节点,让node所在线程进入到acquireQueue方法中,重新进行相关操作 //否则,由于该节点的前驱节点已经是signal状态了,不用在此处唤醒await中的线程,唤醒工作留给CLH队列中前驱节点 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);//唤醒 return true; }
boolean transferAfterCancelledWait(Node node)
将当前Node强制transfer到CLH队列中
1 2 3 4 5 6 7 8 9 10 11
final boolean transferAfterCancelledWait(Node node) { //将该节点状态由CONDITION变成0,调用enq将该节点从CONDITION队列添加到CLH队列中(但是在CONDITION队列中的nextWaiter连接并没有取消) if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } //循环检测该node是否已经成功添加到CLH队列中 while (!isOnSyncQueue(node)) Thread.yield(); return false; }
int fullyRelease(Node node)
完全释放锁,释放成功则返回,失败则将当前节点的状态设置成cancelled表示当前节点失效
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED;//失败则当前node状态为CANCELLED } }
boolean owns(ConditionObject condition)
判断条件对象拥有者
1 2 3
public final boolean owns(ConditionObject condition) { return condition.isOwnedBy(this); }
boolean hasWaiters(ConditionObject condition)
条件队列是否有等待者
1 2 3 4 5
public final boolean hasWaiters(ConditionObject condition) { if (!owns(condition)) throw new IllegalArgumentException("Not owner"); return condition.hasWaiters(); }
int getWaitQueueLength(ConditionObject condition)
获取条件队列等待者数量
1 2 3 4 5
public final int getWaitQueueLength(ConditionObject condition) { if (!owns(condition)) throw new IllegalArgumentException("Not owner"); return condition.getWaitQueueLength(); }
public final Collection<Thread> getWaitingThreads(ConditionObject condition) { if (!owns(condition)) throw new IllegalArgumentException("Not owner"); return condition.getWaitingThreads(); }
int c = ctl.get();//获取当前线程池状态 if (runStateLessThan(c, STOP)) {//是否已经停止或者终止 //是否突然过来的,如果不是突然过来的,代表正常结束 if (!completedAbruptly) { //根据allowCoreThreadTimeOut获取线程池数量最小值 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //队列不为空则最小值不能为0 if (min == 0 && ! workQueue.isEmpty()) min = 1; //当前工作线程数量大于等于最小值,则代表还不能结束,继续执行 if (workerCountOf(c) >= min) return; // replacement not needed } //执行到这里说明当前线程数量小于min的值,需要添加一个Worker //或者突然执行过来的,可能有异常,添加一个Worker addWorker(null, false); } }