0%

Java线程池分析-ThreadPoolExecutor

线程池状态

状态

  • RUNNING
    • 该状态接受新的任务同时处理队列中的任务
  • SHUTDOWN
    • 该状态不再接受新的任务,但是队列中的任务继续执行
  • STOP
    • 不再接受新的任务,也不再处理队列中的任务,并且会中断正在运行的任务
  • TIDYING
    • 所有任务都已经终止,工作线程数为0。调用terminated()方法状态会变为TIDYING
  • TERMINATED
    • terminated()方法执行完成

状态转移

  • RUNNING -> SHUTDOWN
    • 执行shutdown()方法(SHUTDOWN状态可能立即结束进入下一状态)
  • (RUNNING or SHUTDOWN) -> STOP
    • 执行shutdownNow()方法
  • SHUTDOWN -> TIDYING
    • 当所有任务队列和线程池(pool)都空的了时候
  • STOP -> TIDYING
    • 当线程池(pool)为空的时候
  • TIDYING -> TERMINATED
    • terminated()方法完成
      awaitTermination()在状态变为TERMINATED的时候返回

状态位表示

状态位在线程池中使用一个原子类型的Integer进行存储

1
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

Integer的长度是32位,用全部32位来表示线程数量,有点浪费。线程池的状态一共就5种,所以大神决定用ctl的高3位(可以表示8种状态了),来表示线程池的状态,低29位用来计数(大约500_000_000),反正就目前机器来说,想同时开启这么多线程。。。机器早就挂了,所以足够了

几个底层依赖的方法

  • private static int runStateOf(int c) { return c & ~CAPACITY; }
    • 通过位运算,获取当前线程池的状态
  • private static int workerCountOf(int c) { return c & CAPACITY; }
    • 通过位运算,获取当前线程池的工作线程数
  • private static int ctlOf(int rs, int wc) { return rs | wc; }
    • 为了看ctl的状态
  • 然后就是几个关于ctl的cas操作,包括增加一个线程计数,减少一个线程计数

几个重要的Field

  • private final BlockingQueue workQueue;
    • 关键参数,设置线程池corePoolSize满了以后,要将任务放到什么样的阻塞队列中。
  • private final ReentrantLock mainLock = new ReentrantLock();
    • 内部锁,在很多方法中均有用到,包括添加Worker,中断Worker,shutdown,以及获取各种size都需要锁进行同步保护
  • private final HashSet workers = new HashSet();
    • 可以理解为线程的集合,Worker 继承了 AQS 并且实现了 Runnable,也就是线程池中对应的线程的集合。
  • private final Condition termination = mainLock.newCondition();
    • 在 tryTerminate() 方法中进行通知,awaitTermination(long timeout, TimeUnit unit)方法中进行awaitNanos,也就是说调用awaitTermination方法之后,会一直等待,直到tryTerminate方法执行并且通知,才会结束(这时候线程池应该就变为TERMINATED状态了)
  • private int largestPoolSize;
    • 记录线程池中线程数量曾经达到过的最大值。
  • private long completedTaskCount;
    • 这个从字面意思就很好立即了,已经完成的任务数量
  • private volatile ThreadFactory threadFactory;
    • 又一个核心参数,线程的创建工厂,各种大厂的Java开发规范都需要业务自己实现对应的线程工厂,定义线程的名称之类的,主要是后期排查多线程问题的时候方便定位。
  • private volatile RejectedExecutionHandler handler;
    • 也是核心参数,当线程池达到饱和状态(队列满了,maximumPoolSize也达到了),如果还在继续提交的任务,就依靠这个进行处理。
    • 默认有4个实现类,如下:
      • CallerRunsPolicy,这个就是将任务返回给调用方,由调用方执行。
      • AbortPolicy,这个比较粗暴,直接抛出异常。也是默认实现,参考defaultHandler
      • DiscardPolicy,这个应该很少用,直接丢弃提交的任务,Do Nothing
      • DiscardOldestPolicy,这个也是丢弃,不过是丢弃队列中最早的一个(直接调用peeK)
  • private volatile long keepAliveTime;
    • 核心参数之一,线程空闲多久后回收(默认如果小于corePoolSize,则不进行回收)
  • private volatile boolean allowCoreThreadTimeOut;
    • allowCoreThreadTimeOut,core线程是否超时后回收,默认是false
  • private volatile int corePoolSize;
    • 核心参数,core线程池大小
  • private volatile int maximumPoolSize;
    • 核心参数,最大线程池大小(超过这个值就会调用RejectedExecutionHandler)
  • private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
    • 默认Rejected处理策略,抛出异常
  • private static final RuntimePermission shutdownPerm = new RuntimePermission(“modifyThread”);
    • 这个真不太清楚,只知道是在调用shutdown相关方法会调用进行安全检查
  • private final AccessControlContext acc;
    • 在finalize方法中有调用,看不太明白 = =,貌似也是权限访问层面的(AccessController.doPrivileged(pa, acc);)

几个重要的 public 方法

  • public void execute(Runnable command)

    • 提交一个Runnable任务,描述很简单,实现应该是所有方法中最复杂的了,话不多说,直接上源码。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      public void execute(Runnable command) {
      if (command == null)
      throw new NullPointerException();//不允许提交null
      int c = ctl.get();
      //检查目前工作线程数量是否超过了corePoolSize,没有超过的话直接添加任务,添加成功就返回,添加失败则更新状态值c然后继续
      if (workerCountOf(c) < corePoolSize) {
      if (addWorker(command, true))//参数为true则添加的是core线程
      return;
      c = ctl.get();
      }
      //执行到这里肯定是添加worker失败或者已经达到了corePoolSize
      //这时候检查线程状态,确保是Running(因为有可能这期间其他线程调用了shutdown等方法),就开始往队列中添加任务。
      if (isRunning(c) && workQueue.offer(command)) {
      int recheck = ctl.get();
      //添加到队列之后,再次检查线程池状态,如果状态发生变化,则移除任务并执行拒绝策略
      //如果状态没有发生改变,此时如果线程池为空,那就添加一个非核心Worker
      if (! isRunning(recheck) && remove(command))
      reject(command);
      else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
      }//到这里说明添加队列失败,要么是线程池编程非RUNNING状态,要么队列满了,则添加非核心线程,非核心线程如果添加还是失败了,就只能执行拒绝策略了
      else if (!addWorker(command, false))
      reject(command);
      }
      //上面的步骤还不算复杂,接下就是最复杂的addWorker方法了。
      private boolean addWorker(Runnable firstTask, boolean core) {
      retry: //标记位,用于循环控制,也就是外层循环
      for (;;) {
      int c = ctl.get();
      int rs = runStateOf(c);//当前线程池状态
      //如果线程池状态如下,则返回添加失败
      // 1. 线程池状态为STOP,TIDYING,TERMINATED中的一个
      // 2. 线程池状态为SHUTDOWN,并且第一个任务不为空
      // 3. 线程池状态为SHUTDOWN,并且工作队列为空
      if (rs >= SHUTDOWN &&
      ! (rs == SHUTDOWN &&
      firstTask == null &&
      ! workQueue.isEmpty()))
      return false;
      //开始内层循环,执行到这里说明不是上述123中的状态。
      for (;;) {
      int wc = workerCountOf(c);//获取工作线程数量
      //检查是否达到上限,并根据添加的线程类别(core或者非core)判断是否超过对应的最大值,超过也返回false
      if (wc >= CAPACITY ||
      wc >= (core ? corePoolSize : maximumPoolSize))
      return false;
      if (compareAndIncrementWorkerCount(c))//上述校验通过以后,CAS方式增加一个工作线程,如果成功了,则跳出外层循环
      break retry;
      //执行到这里说明cas方式增加线程失败,那就重新检查一下线程池状态,然后内层循环继续,直到增加成功。
      c = ctl.get(); // Re-read ctl
      if (runStateOf(c) != rs)
      continue retry;
      // else CAS failed due to workerCount change; retry inner loop
      }
      }
      //执行到这里,说明已经成功增加了一个线程计数了。
      boolean workerStarted = false;
      boolean workerAdded = false;
      Worker w = null;
      try {
      w = new Worker(firstTask);//使用第一个任务创建一个Worker
      final Thread t = w.thread;//获取对应worker的线程
      if (t != null) {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
      // Recheck while holding lock.
      // Back out on ThreadFactory failure or if
      // shut down before lock acquired.
      int rs = runStateOf(ctl.get());//检查线程池的状态
      //线程池状态是运行的或者刚刚关闭,第一个任务尚未赋值
      if (rs < SHUTDOWN ||
      (rs == SHUTDOWN && firstTask == null)) {
      //这里确保线程还没有被其他线程启动
      if (t.isAlive()) // precheck that t is startable
      throw new IllegalThreadStateException();
      //添加worker到workers集合中
      workers.add(w);
      int s = workers.size();
      //对比并更新最大到达数量
      if (s > largestPoolSize)
      largestPoolSize = s;
      workerAdded = true;//表示已经增加了worker
      }
      } finally {
      mainLock.unlock();
      }
      //如果已经成功增加了worker,就可以启动对应的线程了。
      if (workerAdded) {
      t.start();
      workerStarted = true;
      }
      }
      } finally {
      //这里检查一下worker是否已经启动成功,如果没有启动成功,则执行添加失败操作
      if (! workerStarted)
      addWorkerFailed(w);
      }
      return workerStarted;//返回工作线程是否启动成功
      }
      //简单看一下添加失败的逻辑
      private void addWorkerFailed(Worker w) {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
      if (w != null)
      workers.remove(w);//添加失败则移除掉Set集合中对应的worker
      //并且减少一个工作线程数量计数
      decrementWorkerCount();
      //尝试关闭线程池(感觉这里是为了释放空闲线程)
      tryTerminate();
      } finally {
      mainLock.unlock();
      }
      }
  • public void shutdown()

    • 尝试关闭线程池,执行后状态变为SHUTDOWN,继续完成已经提交的任务,但是新的任务不再被接受。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      public void shutdown() {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
      checkShutdownAccess();//检查访问权?(这块有点懵)
      advanceRunState(SHUTDOWN);//检查是否可以设置为SHUTDOWN并通过CAS操作设置为SHUTDOWN
      interruptIdleWorkers();//中断线程
      onShutdown(); // hook for ScheduledThreadPoolExecutor ,这里是空实现
      } finally {
      mainLock.unlock();
      }
      tryTerminate();//检测并尝试终止线程池
      }
    • 几个内部调用的方法方法
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
          private void advanceRunState(int targetState) {
      for (;;) {
      int c = ctl.get();
      //传参数为SHUTDOWN,则runStateAtLeast在SHUTDOWN、STOP、TIDYING和TERMINATED状态的时候为true,直接跳出循环。如果线程池状态为RUNNING,则进行CAS操作,更新状态位为SHUTDOWN,成功则结束。
      if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
      break;
      }
      }

      private void interruptIdleWorkers() {
      interruptIdleWorkers(false);//中断所有线程,如果参数为true,则仅中断一个线程,具体见下
      }

      private void interruptIdleWorkers(boolean onlyOne) {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
      for (Worker w : workers) {
      Thread t = w.thread;
      //线程处于非中断状态,并且没有其他线程中断该线程(也就是说只能有一个线程进行中断操作)
      if (!t.isInterrupted() && w.tryLock()) {
      try {
      t.interrupt();//依次中断
      } catch (SecurityException ignore) {
      } finally {
      w.unlock();
      }
      }
      if (onlyOne)//如果为true,则中断一个后跳出循环
      break;
      }
      } finally {
      mainLock.unlock();
      }
      }
      //一个空实现
      void onShutdown() {

      }

      final void tryTerminate() {
      for (;;) {
      int c = ctl.get();
      if (isRunning(c) ||//正在运行中,不能设置为TIDYING
      runStateAtLeast(c, TIDYING) ||//线程池为TIDYING或者TERMINATED,其他线程已经开始关闭线程池
      (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))//队列中尚有需要执行的任务,需要等待执行完成,不能设置为TIDYING
      return;
      //上面一坨条件判断说白了就是在等线程池状态为SHUTDOWN并且队列为空或者线程池状态为STOP才会继续,否则放弃终止操作。

      if (workerCountOf(c) != 0) { // Eligible to terminate
      interruptIdleWorkers(ONLY_ONE);//ONLY_ONE在这里为true,也就是仅仅中断一个空闲的worker。
      //补充一下:interruptIdleWorkers的作用是因为在getTask方法中执行workQueue.take()时,如果不执行中断会一直阻塞。在shutdown方法中,会中断所有空闲的工作线程,如果在执行shutdown时工作线程没有空闲,然后又去调用了getTask方法,这时如果workQueue中没有任务了,调用workQueue.take()时就会一直阻塞。所以每次在工作线程结束时调用tryTerminate方法来尝试中断一个空闲工作线程,避免在队列为空时取任务一直阻塞的情况。(引用自:https://www.cnblogs.com/liuzhihu/p/8177371.html)
      return;
      }

      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
      if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {//CAS操作将线程池设置为TIDYING状态
      try {
      terminated();//执行一些清理操作
      } finally {
      ctl.set(ctlOf(TERMINATED, 0));//设置线程池状态为TERMINATED
      termination.signalAll();//通知条件变量termination(也就是awaitTermination方法可以继续执行了)
      }
      return;
      }
      } finally {
      mainLock.unlock();
      }
      // else retry on failed CAS
      }
      }

  • public List shutdownNow()

    • 立即关闭线程池,设置状态为STOP,不再接受新的任务,并且中断正在运行的任务,返回队列中未执行的任务。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      public List<Runnable> shutdownNow() {
      List<Runnable> tasks;
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
      checkShutdownAccess();//检查访问权?(这块有点懵)
      advanceRunState(STOP);//cas方式设置状态为STOP
      interruptWorkers();//中断正在运行的线程
      tasks = drainQueue();//将队列中未执行的任务返回
      } finally {
      mainLock.unlock();
      }
      tryTerminate();//检测并尝试终止线程池
      return tasks;
      }
    • 看一下里面具体的方法,checkShutdownAccess、advanceRunState和tryTerminate见上
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      private void interruptWorkers() {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
      for (Worker w : workers)//遍历workers集合
      w.interruptIfStarted();//依次中断
      } finally {
      mainLock.unlock();
      }
      }
      Worker的interruptIfStarted方法如下:
      void interruptIfStarted() {
      Thread t;
      //state小于0是不可中断的标识,只能在大于等于0的时候进行中断
      if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
      try {
      t.interrupt();
      } catch (SecurityException ignore) {
      }
      }
      }
      //转移队列剩余任务方法
      private List<Runnable> drainQueue() {
      BlockingQueue<Runnable> q = workQueue;
      ArrayList<Runnable> taskList = new ArrayList<Runnable>();
      q.drainTo(taskList);//将q的所有任务转移到taskList中
      if (!q.isEmpty()) {//上一步操作可能会失败,再次检查(什么情况会失败?这块我也没太明白)
      for (Runnable r : q.toArray(new Runnable[0])) {
      if (q.remove(r))
      taskList.add(r);
      }
      }
      return taskList;
      }
  • public boolean isShutdown()

    • 通过ctl获取线程池的状态,没啥可说的
  • public boolean isTerminating()

    • 通过ctl获取线程池的状态,只要不是运行的,不是TERMINATED,就处于这个状态
  • public boolean isTerminated()

    • 通过ctl获取线程池的状态,并且状态是TERMINATED
  • public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException

    • 带超时的等待线程池状态变为TERMINATED
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      public boolean awaitTermination(long timeout, TimeUnit unit)
      throws InterruptedException {
      long nanos = unit.toNanos(timeout);
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
      for (;;) {
      if (runStateAtLeast(ctl.get(), TERMINATED))//CAS循环判断状态是否为TERMINATED
      return true;
      if (nanos <= 0)//超时则返回false
      return false;
      nanos = termination.awaitNanos(nanos);//还记得上面的termination.signalAll()吧,就是这里等待通知
      }
      } finally {
      mainLock.unlock();
      }
      }
  • public boolean prestartCoreThread()

    • 调用该方法,则执行启动一个核心线程,源码比较简单,只要不足corePoolSize,就执行增加操作。
      1
      2
      3
      4
      public boolean prestartCoreThread() {
      return workerCountOf(ctl.get()) < corePoolSize &&
      addWorker(null, true);
      }
  • public int prestartAllCoreThreads()

    • 一次性启动Core工作线程到corePoolSize,返回启动了几个线程。
      1
      2
      3
      4
      5
      6
      public int prestartAllCoreThreads() {
      int n = 0;
      while (addWorker(null, true))
      ++n;
      return n;
      }
  • public boolean remove(Runnable task)

    • 移除一个task,移除操作之后会执行tryTerminate()方法
      1
      2
      3
      4
      5
      6
      public boolean remove(Runnable task) {
      boolean removed = workQueue.remove(task);
      //会检查线程池状态,并根据状态决定是否终止线程池(特别针对的场景就是shudown状态+空队列,就可以进入下一个状态)
      tryTerminate(); // In case SHUTDOWN and now empty
      return removed;
      }
  • public void purge()

    • 尝试从队列中移除所有已经取消了的Future任务
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      public void purge() {
      final BlockingQueue<Runnable> q = workQueue;
      try {
      Iterator<Runnable> it = q.iterator();
      while (it.hasNext()) {
      Runnable r = it.next();
      //移除掉已经取消的Future任务
      if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
      it.remove();
      }
      } catch (ConcurrentModificationException fallThrough) {//遍历过程中发生了其他修改,快速失败采用数组快照方式删除
      // Take slow path if we encounter interference during traversal.
      // Make copy for traversal and call remove for cancelled entries.
      // The slow path is more likely to be O(N*N).
      for (Object r : q.toArray())
      if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
      q.remove(r);
      }
      //会检查线程池状态,并根据状态决定是否终止线程池(特别针对的场景就是shudown状态+空队列,就可以进入下一个状态)
      tryTerminate(); // In case SHUTDOWN and now empty
      }

待分析内容

简单概述

  • 上面这一堆只是简单分析了一下ThreadPoolExecutor内部的方法,这个类继承了AbstractExecutorService,之后会在分析一下AbstractExecutorService,此外ThreadPoolExecutor内部还有一个Worker类,它承了AbstractQueuedSynchronizer(AQS),这两个也是后面要分析的重点。尤其是AQS,后面的各种锁相关都是依赖于它

TODO项

  • AbstractExecutorService
  • Worker
  • AbstractQueuedSynchronizer