0%

Java线程池分析-Worker

结构

其实从结构上来看,Worker十分简单。实现了Runnable接口,同时继承了AQS队列。如下图所示:
类结构

Worker的方法也不多,也比较简单,如下图所示:
方法

分析

内部Field

  • 内部Field不多,如下:
    • Thread thread 实际的工作线程
    • Runnable firstTask 初始化的第一个任务
    • long completedTasks 当前Worker已经完成的任务数
  • 在补充一下父类的state
    • 0 代表是未锁定状态
    • 1 代表是锁定状态
    • -1 代表是不允许被中断,在构造参数中设置
      接下来简单分析一下各个方法:

方法

public void run()

  • 直接调用线程池的runWorker方法,之后分析
    1
    2
    3
    public void run() {
    runWorker(this);
    }

protected boolean isHeldExclusively()

  • 是否是独占排他的
    1
    2
    3
    protected boolean isHeldExclusively() {
    return getState() != 0;
    }

protected boolean tryAcquire(int unused)

  • 尝试获取锁,参考AQS,这里还是使用CAS进行操作,失败则快速返回
    1
    2
    3
    4
    5
    6
    7
    protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
    setExclusiveOwnerThread(Thread.currentThread());
    return true;
    }
    return false;
    }

protected boolean tryRelease(int unused)

  • 尝试释放锁,这个貌似只会成功,不会失败
    1
    2
    3
    4
    5
    protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
    }

public void lock()

  • 加锁,不过这里是阻塞式的
    1
    public void lock()        { acquire(1); }

public boolean tryLock()

  • 尝试加锁,调用tryAcquire
    1
    public boolean tryLock()  { return tryAcquire(1); }

public void unlock()

  • 释放锁操作,参考AQS
    1
    public void unlock()      { release(1); }

public boolean isLocked()

  • 是否处于锁定状态,调用isHeldExclusively方法,也就是看state是否为0
    1
    public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted()

  • 如果处于运行状态,则进行中断。state>=0代表可以进行中断。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
    try {
    t.interrupt();
    } catch (SecurityException ignore) {
    }
    }
    }

关联方法

runWorker

  • Worker直接调用线程池的runWorker方法,将自身作为参数,执行任务,具体如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    final void runWorker(Worker w) {
    //当前工作线程
    Thread wt = Thread.currentThread();
    //获取待执行的初始任务
    Runnable task = w.firstTask;
    //清除firstTask
    w.firstTask = null;
    //释放w锁(这个时候可以进行中断操作)
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
    //开始循环获取任务
    while (task != null || (task = getTask()) != null) {
    //获取任务之后先进行加锁
    w.lock();
    //检查线程池状态,是否需要中断。
    //这块逻辑比较绕,整理一下
    // 首先wt也就是当前线程,不能被中断。
    // 如果线程池的状态为STOP,TIDYING,TERMINATED 则直接中断
    // (剩下的就是原文中的注释了)如果线程池正在停止过程中,确保线程是中断的。否则就确保线程不会被中断。
    if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(),STOP))) && !wt.isInterrupted())
    wt.interrupt();
    try {
    beforeExecute(wt, task);//这里实际上是空实现
    Throwable thrown = null;
    try {
    task.run();//实际执行
    } catch (RuntimeException x) {
    thrown = x; throw x;
    } catch (Error x) {
    thrown = x; throw x;
    } catch (Throwable x) {
    thrown = x; throw new Error(x);
    } finally {
    afterExecute(task, thrown);//这里也是空实现
    }
    } finally {
    task = null;
    w.completedTasks++;//执行完成后,完成任务+1
    w.unlock();//执行完成后释放锁
    }
    }
    completedAbruptly = false;
    } finally {
    //清理工作,执行到这里代表Worker已经准备销毁了
    processWorkerExit(w, completedAbruptly);
    }
    }

getTask

  • 获取任务的方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
    //先检查线程池的状态
    int c = ctl.get();
    int rs = runStateOf(c);
    //如果已经关闭并且队列为空则返回null,并减少一个工作线程
    //如果已经为STOP,TIDYING,TERMINATED 则减少一个工作线程
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    decrementWorkerCount();
    return null;
    }
    //获取工作线程数量
    int wc = workerCountOf(c);
    //判断是否需要清理Worker
    // 一种是allowCoreThreadTimeOut=true的情况
    //一种是工作线程数量已经超过核心线程数量了
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    //状态检查
    //满足以下两个条件,则会减少工作线程数量
    //1.已经超时或者工作线程数量超过最大线程数量的
    //2.至少有一个工作线程或者任务队列为空
    if ((wc > maximumPoolSize || (timed && timedOut))
    && (wc > 1 || workQueue.isEmpty())) {
    if (compareAndDecrementWorkerCount(c))
    return null;
    continue;
    }
    try {
    //
    Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://带有超时的方式获取,超时之后返回null
    workQueue.take();//阻塞方式获取
    //没有超时则返回结果,否则设置超时状态
    if (r != null)
    return r;
    timedOut = true;
    } catch (InterruptedException retry) {
    timedOut = false;
    }
    }
    }

processWorkerExit

  • 做一些收尾工作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // 还记得runWork方法中的completedAbruptly么,就是这个了,为true代表没有执行的改变为false,突然执行到这里了。
    decrementWorkerCount();//减少工作线程数量

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    completedTaskCount += w.completedTasks;//更新一下总共完成的任务
    workers.remove(w);//从Worker集合中中移除自己
    } finally {
    mainLock.unlock();
    }
    //尝试设置线程池为TERMINATED,见线程池部分分析
    //线程池为SHUTDOWN且队列为空或者线程池状态为STOP,则触发设置线程池为TERMINATED
    tryTerminate();

    int c = ctl.get();//获取当前线程池状态
    if (runStateLessThan(c, STOP)) {//是否已经停止或者终止
    //是否突然过来的,如果不是突然过来的,代表正常结束
    if (!completedAbruptly) {
    //根据allowCoreThreadTimeOut获取线程池数量最小值
    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
    //队列不为空则最小值不能为0
    if (min == 0 && ! workQueue.isEmpty())
    min = 1;
    //当前工作线程数量大于等于最小值,则代表还不能结束,继续执行
    if (workerCountOf(c) >= min)
    return; // replacement not needed
    }
    //执行到这里说明当前线程数量小于min的值,需要添加一个Worker
    //或者突然执行过来的,可能有异常,添加一个Worker
    addWorker(null, false);
    }
    }