0%

线程池状态

状态

  • RUNNING
    • 该状态接受新的任务同时处理队列中的任务
  • SHUTDOWN
    • 该状态不再接受新的任务,但是队列中的任务继续执行
  • STOP
    • 不再接受新的任务,也不再处理队列中的任务,并且会中断正在运行的任务
  • TIDYING
    • 所有任务都已经终止,工作线程数为0。调用terminated()方法状态会变为TIDYING
  • TERMINATED
    • terminated()方法执行完成

状态转移

  • RUNNING -> SHUTDOWN
    • 执行shutdown()方法(SHUTDOWN状态可能立即结束进入下一状态)
  • (RUNNING or SHUTDOWN) -> STOP
    • 执行shutdownNow()方法
  • SHUTDOWN -> TIDYING
    • 当所有任务队列和线程池(pool)都空的了时候
  • STOP -> TIDYING
    • 当线程池(pool)为空的时候
  • TIDYING -> TERMINATED
    • terminated()方法完成
      awaitTermination()在状态变为TERMINATED的时候返回

状态位表示

状态位在线程池中使用一个原子类型的Integer进行存储

1
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

Integer的长度是32位,用全部32位来表示线程数量,有点浪费。线程池的状态一共就5种,所以大神决定用ctl的高3位(可以表示8种状态了),来表示线程池的状态,低29位用来计数(大约500_000_000),反正就目前机器来说,想同时开启这么多线程。。。机器早就挂了,所以足够了

几个底层依赖的方法

  • private static int runStateOf(int c) { return c & ~CAPACITY; }
    • 通过位运算,获取当前线程池的状态
  • private static int workerCountOf(int c) { return c & CAPACITY; }
    • 通过位运算,获取当前线程池的工作线程数
  • private static int ctlOf(int rs, int wc) { return rs | wc; }
    • 为了看ctl的状态
  • 然后就是几个关于ctl的cas操作,包括增加一个线程计数,减少一个线程计数

几个重要的Field

  • private final BlockingQueue workQueue;
    • 关键参数,设置线程池corePoolSize满了以后,要将任务放到什么样的阻塞队列中。
  • private final ReentrantLock mainLock = new ReentrantLock();
    • 内部锁,在很多方法中均有用到,包括添加Worker,中断Worker,shutdown,以及获取各种size都需要锁进行同步保护
  • private final HashSet workers = new HashSet();
    • 可以理解为线程的集合,Worker 继承了 AQS 并且实现了 Runnable,也就是线程池中对应的线程的集合。
  • private final Condition termination = mainLock.newCondition();
    • 在 tryTerminate() 方法中进行通知,awaitTermination(long timeout, TimeUnit unit)方法中进行awaitNanos,也就是说调用awaitTermination方法之后,会一直等待,直到tryTerminate方法执行并且通知,才会结束(这时候线程池应该就变为TERMINATED状态了)
  • private int largestPoolSize;
    • 记录线程池中线程数量曾经达到过的最大值。
  • private long completedTaskCount;
    • 这个从字面意思就很好立即了,已经完成的任务数量
  • private volatile ThreadFactory threadFactory;
    • 又一个核心参数,线程的创建工厂,各种大厂的Java开发规范都需要业务自己实现对应的线程工厂,定义线程的名称之类的,主要是后期排查多线程问题的时候方便定位。
  • private volatile RejectedExecutionHandler handler;
    • 也是核心参数,当线程池达到饱和状态(队列满了,maximumPoolSize也达到了),如果还在继续提交的任务,就依靠这个进行处理。
    • 默认有4个实现类,如下:
      • CallerRunsPolicy,这个就是将任务返回给调用方,由调用方执行。
      • AbortPolicy,这个比较粗暴,直接抛出异常。也是默认实现,参考defaultHandler
      • DiscardPolicy,这个应该很少用,直接丢弃提交的任务,Do Nothing
      • DiscardOldestPolicy,这个也是丢弃,不过是丢弃队列中最早的一个(直接调用peeK)
  • private volatile long keepAliveTime;
    • 核心参数之一,线程空闲多久后回收(默认如果小于corePoolSize,则不进行回收)
  • private volatile boolean allowCoreThreadTimeOut;
    • allowCoreThreadTimeOut,core线程是否超时后回收,默认是false
  • private volatile int corePoolSize;
    • 核心参数,core线程池大小
  • private volatile int maximumPoolSize;
    • 核心参数,最大线程池大小(超过这个值就会调用RejectedExecutionHandler)
  • private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
    • 默认Rejected处理策略,抛出异常
  • private static final RuntimePermission shutdownPerm = new RuntimePermission(“modifyThread”);
    • 这个真不太清楚,只知道是在调用shutdown相关方法会调用进行安全检查
  • private final AccessControlContext acc;
    • 在finalize方法中有调用,看不太明白 = =,貌似也是权限访问层面的(AccessController.doPrivileged(pa, acc);)

几个重要的 public 方法

  • public void execute(Runnable command)

    • 提交一个Runnable任务,描述很简单,实现应该是所有方法中最复杂的了,话不多说,直接上源码。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      public void execute(Runnable command) {
      if (command == null)
      throw new NullPointerException();//不允许提交null
      int c = ctl.get();
      //检查目前工作线程数量是否超过了corePoolSize,没有超过的话直接添加任务,添加成功就返回,添加失败则更新状态值c然后继续
      if (workerCountOf(c) < corePoolSize) {
      if (addWorker(command, true))//参数为true则添加的是core线程
      return;
      c = ctl.get();
      }
      //执行到这里肯定是添加worker失败或者已经达到了corePoolSize
      //这时候检查线程状态,确保是Running(因为有可能这期间其他线程调用了shutdown等方法),就开始往队列中添加任务。
      if (isRunning(c) && workQueue.offer(command)) {
      int recheck = ctl.get();
      //添加到队列之后,再次检查线程池状态,如果状态发生变化,则移除任务并执行拒绝策略
      //如果状态没有发生改变,此时如果线程池为空,那就添加一个非核心Worker
      if (! isRunning(recheck) && remove(command))
      reject(command);
      else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
      }//到这里说明添加队列失败,要么是线程池编程非RUNNING状态,要么队列满了,则添加非核心线程,非核心线程如果添加还是失败了,就只能执行拒绝策略了
      else if (!addWorker(command, false))
      reject(command);
      }
      //上面的步骤还不算复杂,接下就是最复杂的addWorker方法了。
      private boolean addWorker(Runnable firstTask, boolean core) {
      retry: //标记位,用于循环控制,也就是外层循环
      for (;;) {
      int c = ctl.get();
      int rs = runStateOf(c);//当前线程池状态
      //如果线程池状态如下,则返回添加失败
      // 1. 线程池状态为STOP,TIDYING,TERMINATED中的一个
      // 2. 线程池状态为SHUTDOWN,并且第一个任务不为空
      // 3. 线程池状态为SHUTDOWN,并且工作队列为空
      if (rs >= SHUTDOWN &&
      ! (rs == SHUTDOWN &&
      firstTask == null &&
      ! workQueue.isEmpty()))
      return false;
      //开始内层循环,执行到这里说明不是上述123中的状态。
      for (;;) {
      int wc = workerCountOf(c);//获取工作线程数量
      //检查是否达到上限,并根据添加的线程类别(core或者非core)判断是否超过对应的最大值,超过也返回false
      if (wc >= CAPACITY ||
      wc >= (core ? corePoolSize : maximumPoolSize))
      return false;
      if (compareAndIncrementWorkerCount(c))//上述校验通过以后,CAS方式增加一个工作线程,如果成功了,则跳出外层循环
      break retry;
      //执行到这里说明cas方式增加线程失败,那就重新检查一下线程池状态,然后内层循环继续,直到增加成功。
      c = ctl.get(); // Re-read ctl
      if (runStateOf(c) != rs)
      continue retry;
      // else CAS failed due to workerCount change; retry inner loop
      }
      }
      //执行到这里,说明已经成功增加了一个线程计数了。
      boolean workerStarted = false;
      boolean workerAdded = false;
      Worker w = null;
      try {
      w = new Worker(firstTask);//使用第一个任务创建一个Worker
      final Thread t = w.thread;//获取对应worker的线程
      if (t != null) {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
      // Recheck while holding lock.
      // Back out on ThreadFactory failure or if
      // shut down before lock acquired.
      int rs = runStateOf(ctl.get());//检查线程池的状态
      //线程池状态是运行的或者刚刚关闭,第一个任务尚未赋值
      if (rs < SHUTDOWN ||
      (rs == SHUTDOWN && firstTask == null)) {
      //这里确保线程还没有被其他线程启动
      if (t.isAlive()) // precheck that t is startable
      throw new IllegalThreadStateException();
      //添加worker到workers集合中
      workers.add(w);
      int s = workers.size();
      //对比并更新最大到达数量
      if (s > largestPoolSize)
      largestPoolSize = s;
      workerAdded = true;//表示已经增加了worker
      }
      } finally {
      mainLock.unlock();
      }
      //如果已经成功增加了worker,就可以启动对应的线程了。
      if (workerAdded) {
      t.start();
      workerStarted = true;
      }
      }
      } finally {
      //这里检查一下worker是否已经启动成功,如果没有启动成功,则执行添加失败操作
      if (! workerStarted)
      addWorkerFailed(w);
      }
      return workerStarted;//返回工作线程是否启动成功
      }
      //简单看一下添加失败的逻辑
      private void addWorkerFailed(Worker w) {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
      if (w != null)
      workers.remove(w);//添加失败则移除掉Set集合中对应的worker
      //并且减少一个工作线程数量计数
      decrementWorkerCount();
      //尝试关闭线程池(感觉这里是为了释放空闲线程)
      tryTerminate();
      } finally {
      mainLock.unlock();
      }
      }
  • public void shutdown()

    • 尝试关闭线程池,执行后状态变为SHUTDOWN,继续完成已经提交的任务,但是新的任务不再被接受。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      public void shutdown() {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
      checkShutdownAccess();//检查访问权?(这块有点懵)
      advanceRunState(SHUTDOWN);//检查是否可以设置为SHUTDOWN并通过CAS操作设置为SHUTDOWN
      interruptIdleWorkers();//中断线程
      onShutdown(); // hook for ScheduledThreadPoolExecutor ,这里是空实现
      } finally {
      mainLock.unlock();
      }
      tryTerminate();//检测并尝试终止线程池
      }
    • 几个内部调用的方法方法
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
          private void advanceRunState(int targetState) {
      for (;;) {
      int c = ctl.get();
      //传参数为SHUTDOWN,则runStateAtLeast在SHUTDOWN、STOP、TIDYING和TERMINATED状态的时候为true,直接跳出循环。如果线程池状态为RUNNING,则进行CAS操作,更新状态位为SHUTDOWN,成功则结束。
      if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
      break;
      }
      }

      private void interruptIdleWorkers() {
      interruptIdleWorkers(false);//中断所有线程,如果参数为true,则仅中断一个线程,具体见下
      }

      private void interruptIdleWorkers(boolean onlyOne) {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
      for (Worker w : workers) {
      Thread t = w.thread;
      //线程处于非中断状态,并且没有其他线程中断该线程(也就是说只能有一个线程进行中断操作)
      if (!t.isInterrupted() && w.tryLock()) {
      try {
      t.interrupt();//依次中断
      } catch (SecurityException ignore) {
      } finally {
      w.unlock();
      }
      }
      if (onlyOne)//如果为true,则中断一个后跳出循环
      break;
      }
      } finally {
      mainLock.unlock();
      }
      }
      //一个空实现
      void onShutdown() {

      }

      final void tryTerminate() {
      for (;;) {
      int c = ctl.get();
      if (isRunning(c) ||//正在运行中,不能设置为TIDYING
      runStateAtLeast(c, TIDYING) ||//线程池为TIDYING或者TERMINATED,其他线程已经开始关闭线程池
      (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))//队列中尚有需要执行的任务,需要等待执行完成,不能设置为TIDYING
      return;
      //上面一坨条件判断说白了就是在等线程池状态为SHUTDOWN并且队列为空或者线程池状态为STOP才会继续,否则放弃终止操作。

      if (workerCountOf(c) != 0) { // Eligible to terminate
      interruptIdleWorkers(ONLY_ONE);//ONLY_ONE在这里为true,也就是仅仅中断一个空闲的worker。
      //补充一下:interruptIdleWorkers的作用是因为在getTask方法中执行workQueue.take()时,如果不执行中断会一直阻塞。在shutdown方法中,会中断所有空闲的工作线程,如果在执行shutdown时工作线程没有空闲,然后又去调用了getTask方法,这时如果workQueue中没有任务了,调用workQueue.take()时就会一直阻塞。所以每次在工作线程结束时调用tryTerminate方法来尝试中断一个空闲工作线程,避免在队列为空时取任务一直阻塞的情况。(引用自:https://www.cnblogs.com/liuzhihu/p/8177371.html)
      return;
      }

      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
      if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {//CAS操作将线程池设置为TIDYING状态
      try {
      terminated();//执行一些清理操作
      } finally {
      ctl.set(ctlOf(TERMINATED, 0));//设置线程池状态为TERMINATED
      termination.signalAll();//通知条件变量termination(也就是awaitTermination方法可以继续执行了)
      }
      return;
      }
      } finally {
      mainLock.unlock();
      }
      // else retry on failed CAS
      }
      }

  • public List shutdownNow()

    • 立即关闭线程池,设置状态为STOP,不再接受新的任务,并且中断正在运行的任务,返回队列中未执行的任务。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      public List<Runnable> shutdownNow() {
      List<Runnable> tasks;
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
      checkShutdownAccess();//检查访问权?(这块有点懵)
      advanceRunState(STOP);//cas方式设置状态为STOP
      interruptWorkers();//中断正在运行的线程
      tasks = drainQueue();//将队列中未执行的任务返回
      } finally {
      mainLock.unlock();
      }
      tryTerminate();//检测并尝试终止线程池
      return tasks;
      }
    • 看一下里面具体的方法,checkShutdownAccess、advanceRunState和tryTerminate见上
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      private void interruptWorkers() {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
      for (Worker w : workers)//遍历workers集合
      w.interruptIfStarted();//依次中断
      } finally {
      mainLock.unlock();
      }
      }
      Worker的interruptIfStarted方法如下:
      void interruptIfStarted() {
      Thread t;
      //state小于0是不可中断的标识,只能在大于等于0的时候进行中断
      if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
      try {
      t.interrupt();
      } catch (SecurityException ignore) {
      }
      }
      }
      //转移队列剩余任务方法
      private List<Runnable> drainQueue() {
      BlockingQueue<Runnable> q = workQueue;
      ArrayList<Runnable> taskList = new ArrayList<Runnable>();
      q.drainTo(taskList);//将q的所有任务转移到taskList中
      if (!q.isEmpty()) {//上一步操作可能会失败,再次检查(什么情况会失败?这块我也没太明白)
      for (Runnable r : q.toArray(new Runnable[0])) {
      if (q.remove(r))
      taskList.add(r);
      }
      }
      return taskList;
      }
  • public boolean isShutdown()

    • 通过ctl获取线程池的状态,没啥可说的
  • public boolean isTerminating()

    • 通过ctl获取线程池的状态,只要不是运行的,不是TERMINATED,就处于这个状态
  • public boolean isTerminated()

    • 通过ctl获取线程池的状态,并且状态是TERMINATED
  • public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException

    • 带超时的等待线程池状态变为TERMINATED
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      public boolean awaitTermination(long timeout, TimeUnit unit)
      throws InterruptedException {
      long nanos = unit.toNanos(timeout);
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
      for (;;) {
      if (runStateAtLeast(ctl.get(), TERMINATED))//CAS循环判断状态是否为TERMINATED
      return true;
      if (nanos <= 0)//超时则返回false
      return false;
      nanos = termination.awaitNanos(nanos);//还记得上面的termination.signalAll()吧,就是这里等待通知
      }
      } finally {
      mainLock.unlock();
      }
      }
  • public boolean prestartCoreThread()

    • 调用该方法,则执行启动一个核心线程,源码比较简单,只要不足corePoolSize,就执行增加操作。
      1
      2
      3
      4
      public boolean prestartCoreThread() {
      return workerCountOf(ctl.get()) < corePoolSize &&
      addWorker(null, true);
      }
  • public int prestartAllCoreThreads()

    • 一次性启动Core工作线程到corePoolSize,返回启动了几个线程。
      1
      2
      3
      4
      5
      6
      public int prestartAllCoreThreads() {
      int n = 0;
      while (addWorker(null, true))
      ++n;
      return n;
      }
  • public boolean remove(Runnable task)

    • 移除一个task,移除操作之后会执行tryTerminate()方法
      1
      2
      3
      4
      5
      6
      public boolean remove(Runnable task) {
      boolean removed = workQueue.remove(task);
      //会检查线程池状态,并根据状态决定是否终止线程池(特别针对的场景就是shudown状态+空队列,就可以进入下一个状态)
      tryTerminate(); // In case SHUTDOWN and now empty
      return removed;
      }
  • public void purge()

    • 尝试从队列中移除所有已经取消了的Future任务
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      public void purge() {
      final BlockingQueue<Runnable> q = workQueue;
      try {
      Iterator<Runnable> it = q.iterator();
      while (it.hasNext()) {
      Runnable r = it.next();
      //移除掉已经取消的Future任务
      if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
      it.remove();
      }
      } catch (ConcurrentModificationException fallThrough) {//遍历过程中发生了其他修改,快速失败采用数组快照方式删除
      // Take slow path if we encounter interference during traversal.
      // Make copy for traversal and call remove for cancelled entries.
      // The slow path is more likely to be O(N*N).
      for (Object r : q.toArray())
      if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
      q.remove(r);
      }
      //会检查线程池状态,并根据状态决定是否终止线程池(特别针对的场景就是shudown状态+空队列,就可以进入下一个状态)
      tryTerminate(); // In case SHUTDOWN and now empty
      }

待分析内容

简单概述

  • 上面这一堆只是简单分析了一下ThreadPoolExecutor内部的方法,这个类继承了AbstractExecutorService,之后会在分析一下AbstractExecutorService,此外ThreadPoolExecutor内部还有一个Worker类,它承了AbstractQueuedSynchronizer(AQS),这两个也是后面要分析的重点。尤其是AQS,后面的各种锁相关都是依赖于它

TODO项

  • AbstractExecutorService
  • Worker
  • AbstractQueuedSynchronizer

这两个都是Java提供的原生写时复制的容器(也就是java.util.concurrent包下的)。CopyOnWrite的基本思路是在修改(包括增加和修改)之前,拷贝出一份快照,对快照进行修改,然后替换原引用。读取操作则没有进行加锁,所以适用于读多写少的场景。另外由于使用了快照模式(迭代器,修改等操作),因此不能保证强一致性,只能保证最终一致性。

CopyOnWriteArraySet是对CopyOnWriteArrayList的包装,所以重点关注一下CopyOnWriteArrayList。

CopyOnWriteArrayList

从增删改查4个方面总结下这个容器(相对ConcurrentHashMap,CopyOnWrite的实现实在是简单太多了。。。)

增加方法就2个:

  • public boolean add(E e)
    • 整体思路比较简单,先获取排他锁(同一时刻只能有一个线程进行修改操作),然后拷贝一份新的数组并插入新的元素(此时读取操作读的还是旧的数组),最后替换引用并释放锁。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      public boolean add(E e) {
      final ReentrantLock lock = this.lock;
      lock.lock();//获取排他锁
      try {
      Object[] elements = getArray();//获取当前引用(可以理解为快照)
      int len = elements.length;//当前的长度
      Object[] newElements = Arrays.copyOf(elements, len + 1);//拷贝到一个新的数组中
      newElements[len] = e;//末尾增加新的元素
      setArray(newElements);//替换引用
      return true;//返回结果
      } finally {
      lock.unlock();//释放排他锁
      }
      }
  • public void add(int index, E element)
    • 整体流程和上一个方法基本一致,区别在于进行索引的有效判断,同时检测是不是在数组的最后插入,拷贝的过程是调用System.arraycopy进行分区间拷贝。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      public void add(int index, E element) {
      final ReentrantLock lock = this.lock;
      lock.lock();//一样获取独占锁
      try {
      Object[] elements = getArray();//获取当前引用
      int len = elements.length;
      if (index > len || index < 0)//越界检查
      throw new IndexOutOfBoundsException("Index: "+index+
      ", Size: "+len);
      Object[] newElements;
      int numMoved = len - index;
      if (numMoved == 0)//判断是不是最后一个元素
      newElements = Arrays.copyOf(elements, len + 1);//直接同上一个方法,生成len+1长度的数组
      else {
      newElements = new Object[len + 1];//生成新的空数组
      System.arraycopy(elements, 0, newElements, 0, index);//拷贝0~index
      System.arraycopy(elements, index, newElements, index + 1,
      numMoved);//拷贝index+1 ~ 最后
      }
      newElements[index] = element;//index位置填充
      setArray(newElements);//替换引用
      } finally {
      lock.unlock();//释放锁
      }
      }

删除操作对外暴露了2个方法,一个是通过索引下标进行删除,一个是找到指定的Object进行删除。

  • public E remove(int index)
    • 这个方法实现和 add(int index, E element) 基本类似,不做过多解释了。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      public E remove(int index) {
      final ReentrantLock lock = this.lock;
      lock.lock();
      try {
      Object[] elements = getArray();
      int len = elements.length;
      E oldValue = get(elements, index);//获取指定下标元素
      int numMoved = len - index - 1;
      if (numMoved == 0)//判断是否为最后一个元素
      setArray(Arrays.copyOf(elements, len - 1));//形成新的0~len-1数组
      else {
      Object[] newElements = new Object[len - 1];
      //跳过要删除的元素
      System.arraycopy(elements, 0, newElements, 0, index);
      System.arraycopy(elements, index + 1, newElements, index,
      numMoved);
      setArray(newElements);//替换引用
      }
      return oldValue;
      } finally {
      lock.unlock();
      }
      }
  • public boolean remove(Object o)
    • 首先获取数组引用,然后尝试寻找元素,找到了则进入删除操作,找不到则直接返回false
      1
      2
      3
      4
      5
      public boolean remove(Object o) {
      Object[] snapshot = getArray();//获取引用快照
      int index = indexOf(o, snapshot, 0, snapshot.length);//尝试寻找(注意,此时并未加锁,如果此时插入了数据或者其他线程插入了数据,是看不到的,需要在删除的时候重新进行查找)
      return (index < 0) ? false : remove(o, snapshot, index);//找到了则进行删除(index>=0的情况),找不到则返回false
      }
    • 看一下具体的删除操作
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      private boolean remove(Object o, Object[] snapshot, int index) {
      final ReentrantLock lock = this.lock;
      lock.lock();//加锁
      try {
      Object[] current = getArray();//重新获取一次快照引用
      int len = current.length;
      if (snapshot != current) findIndex: {//判断在上一步中获取到的快照是否发生了改变,如果没有改变,则直接进行删除,如果发生了改变,则需要进一步处理,也就是下面的逻辑。
      int prefix = Math.min(index, len);//判断一下以防数组越界
      for (int i = 0; i < prefix; i++) {
      if (current[i] != snapshot[i] && eq(o, current[i])) {//依次遍历查找,找到则结束if语句块,并更新index
      index = i;
      break findIndex;
      }
      }
      if (index >= len)//上述条件并未找到,则判断是否已经超过索引长度(其他线程已经删除元素了,可能会执行到这里)
      return false;
      if (current[index] == o)//这块没想明白什么情况会执行到这里
      break findIndex;
      index = indexOf(o, current, index, len);//重新查找,因为已经加锁了,所以不会有其他线程进行修改
      if (index < 0)
      return false;//没有找到
      }
      //后面就简单了,还是重新拷贝,更新引用
      Object[] newElements = new Object[len - 1];
      System.arraycopy(current, 0, newElements, 0, index);
      System.arraycopy(current, index + 1,
      newElements, index,
      len - index - 1);
      setArray(newElements);
      return true;
      } finally {
      lock.unlock();
      }
      }

修改操作相对比较简单,通过索引下标直接更新即可,方法如下:

  • public E set(int index, E element)
    • 操作也不复杂,加锁后重新拷贝并替换,唯独注意就是元素没有变化,也要重新更新一下引用(注释说是为了确保volatile写语义)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      public E set(int index, E element) {
      final ReentrantLock lock = this.lock;
      lock.lock();
      try {
      Object[] elements = getArray();
      E oldValue = get(elements, index);

      if (oldValue != element) {
      int len = elements.length;
      Object[] newElements = Arrays.copyOf(elements, len);
      newElements[index] = element;
      setArray(newElements);
      } else {
      // Not quite a no-op; ensures volatile write semantics
      setArray(elements);
      }
      return oldValue;
      } finally {
      lock.unlock();
      }
      }

查询操作外部暴露了4个方法,其实内部都是委托给对应的私有方法,只是没有index参数的方法委托的时候传值是0。

  • public int indexOf(Object o)
  • public int indexOf(E e, int index)
  • public int lastIndexOf(Object o)
  • public int lastIndexOf(E e, int index)
    前两个方法委托给 private static int indexOf(Object o, Object[] elements, int index, int fence) 执行
    后两个方法委托给 private static int lastIndexOf(Object o, Object[] elements, int index) 执行
    下面整理一下这两个查找方法:
  • indexOf 只是看一下元素是否为空,为空则找null,不为空则直接进行遍历查找
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    private static int indexOf(Object o, Object[] elements,
    int index, int fence) {
    if (o == null) {
    for (int i = index; i < fence; i++)
    if (elements[i] == null)
    return i;
    } else {
    for (int i = index; i < fence; i++)
    if (o.equals(elements[i]))
    return i;
    }
    return -1;
    }
  • lastIndexOf 和 indexOf 几乎相同,只是倒着查找而已。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private static int lastIndexOf(Object o, Object[] elements, int index) {
    if (o == null) {
    for (int i = index; i >= 0; i--)
    if (elements[i] == null)
    return i;
    } else {
    for (int i = index; i >= 0; i--)
    if (o.equals(elements[i]))
    return i;
    }
    return -1;
    }

其他方法

  • contains方法,也是直接调用indexOf方法,判断一下索引位置是不是>=0,就不多说了

  • get方法,直接获取一下数组引用(获取引用之后,其他线程修改,这里也看不到了。。。),然后取对应下标。

  • addIfAbsent方法,CopyOnWriteArraySet就是基于这个方法实现的。先通过indexOf方法查找,找到直接返回false,找不到则加锁进行添加,添加之前会判断一下引用是否改变,和remove比较相似。代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    private boolean addIfAbsent(E e, Object[] snapshot) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    Object[] current = getArray();
    int len = current.length;
    if (snapshot != current) {//引用发生改变,则需要重新查找,如果还是没找到,则继续添加
    // Optimize for lost race to another addXXX operation
    int common = Math.min(snapshot.length, len);
    for (int i = 0; i < common; i++)//遍历查找,并且找到了,则返回false
    if (current[i] != snapshot[i] && eq(e, current[i]))
    return false;
    if (indexOf(e, current, common, len) >= 0)//没太明白为什么又要再次查找。。
    return false;
    }
    Object[] newElements = Arrays.copyOf(current, len + 1);
    newElements[len] = e;
    setArray(newElements);
    return true;
    } finally {
    lock.unlock();
    }
    }
  • containsAll方法, 实现也比较简单暴力,获取快照后,直接for循环对每一个元素进行判断,就不附代码了。

  • removeAll方法,也不算复杂,加锁之后遍历元素,不在待删除集合中,则加入新数组中,然后重新拷贝一份即可

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public boolean removeAll(Collection<?> c) {
    if (c == null) throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    Object[] elements = getArray();
    int len = elements.length;
    if (len != 0) {
    // temp array holds those elements we know we want to keep
    int newlen = 0;
    Object[] temp = new Object[len];
    for (int i = 0; i < len; ++i) {
    Object element = elements[i];
    if (!c.contains(element))
    temp[newlen++] = element;
    }
    if (newlen != len) {
    setArray(Arrays.copyOf(temp, newlen));
    return true;
    }
    }
    return false;
    } finally {
    lock.unlock();
    }
    }
  • retainAll方法,和removeAll方法类似,对应的是两个集合都包含的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public boolean retainAll(Collection<?> c) {
    if (c == null) throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    Object[] elements = getArray();
    int len = elements.length;
    if (len != 0) {
    // temp array holds those elements we know we want to keep
    int newlen = 0;
    Object[] temp = new Object[len];
    for (int i = 0; i < len; ++i) {
    Object element = elements[i];
    if (c.contains(element))
    temp[newlen++] = element;
    }
    if (newlen != len) {
    setArray(Arrays.copyOf(temp, newlen));
    return true;
    }
    }
    return false;
    } finally {
    lock.unlock();
    }
    }
  • addAllAbsent方法, indexOf(e, cs, 0, added) 采用将遍历过的待添加元素数组进行替换,节省了新开辟数组的空间,这是一个用的很巧的方法。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public int addAllAbsent(Collection<? extends E> c) {
    Object[] cs = c.toArray();
    if (cs.length == 0)
    return 0;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    Object[] elements = getArray();
    int len = elements.length;
    int added = 0;
    // uniquify and compact elements in cs
    for (int i = 0; i < cs.length; ++i) {
    Object e = cs[i];
    if (indexOf(e, elements, 0, len) < 0 &&
    indexOf(e, cs, 0, added) < 0)
    cs[added++] = e;
    }
    if (added > 0) {
    Object[] newElements = Arrays.copyOf(elements, len + added);
    System.arraycopy(cs, 0, newElements, len, added);
    setArray(newElements);
    }
    return added;
    } finally {
    lock.unlock();
    }
    }
  • clear方法,就是设置一个空数组,就不附代码了

  • addAll方法,也比较简单,转成数组之后直接合并,带索引位置的,就是跳过一部分后,在插入,最后补充剩余的

  • sort方法,也比较简单,拷贝一份快照后,对快照进行排序,然后替换引用。

  • equals 和 hashCode ,这两个方法都进行了重写,注意一下,都是使用快照进行比较,所以都是弱一致性的。

  • iterator 迭代器是创建一个COWIterator迭代器,不支持删除和修改操作,只能进行读取操作。

  • 其他就不进行分析了

CopyOnWriteArraySet

其实没啥可分析的,内部就一个CopyOnWriteArrayList,所有方法都是调用 CopyOnWriteArrayList的方法,去重用的是addIfAbsent和addAllAbsent方法,= =感觉这个Set性能有点差。。。远不及HashSet和TreeSet。

总结

平时项目中时不时还是会用到CopyOnWriteArrayList,CopyOnWriteArraySet至少我还没用到过,而且看了实现,也不太敢用。。。。
回归主题:

  • CopyOnWrite相关读取操作都是不加锁的,拷贝一份内部数组的引用,就开始读取了,不用加锁的原因就是所有的数组实际上都不会发生改变,因为所有的修改操作都是生成一份新的数组。
  • 因为读取或者迭代器,乃至hashcode,equals方法都是基于快照进行相关操作,所以可能读到的数据并不是最新的,也就是无法保证实时的一致性(就是所谓的弱一致性),但是最终数据还是一致的。
  • 因为几乎每次修改,都会生成新的数组,如果写入比较频繁,可能产生大量垃圾,加重GC负担,所以CopyOnWrite最适合的场景就是读多写少。
  • CopyOnWriteArrayList源码上有很多值得学习的小技巧,比如addAllAbsent方法中用替换数组中读过的位置存储待合并字符,省去了开辟新数组的开销。

起因

好像Java面试必不可少的一个问题就是,Java中集合有哪些?分别有什么特点。照搬各种《XXX从入门到放弃》,集合有2种类型,一个是有序可重复的List,一个是无序不可重复的Set,可是真的用起来的时候好像就不是简单的这样了。

先来看一下集合的整体关系图(并发包中的集合没有考虑进来,仅看java.util包)
Java集合类

第一眼看上去我也懵,本来以为自己天天用的那些集合类已经差不多了,然后发现一坨坨的没见过没用过的。

那就啃吧。。

Collection

  • Collection应该是老大哥级别的了,算是集合类的鼻祖(迭代器忽略),定义了一个集合应该有的基本方法,包括增删迭代等,这个就不多说了。
  • 1.8版本开始,增加了流式操作的几个default方法(先不关注了)

List

List应该是集合中用的最多最多的了,平时搬砖,基本上几行代码就要加上一个List存储各种元素。忽略抽象方法,整理一下对应的实现类

ArrayList

ArrayList应该是日常搬砖使用最多的的实现了,特点如下:

  • 底层实现是数组,对应代码:
    1
    transient Object[] elementData;
  • 实现了动态扩容方法,简单说就是容量不够了,我就新建一个数组,然后将原来的数据拷贝到新数组中。从代码int newCapacity = oldCapacity + (oldCapacity >> 1);中也可以看出来,每次扩容变为原本的1.5倍。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    private void grow(int minCapacity) {
    // overflow-conscious code
    int oldCapacity = elementData.length;
    int newCapacity = oldCapacity + (oldCapacity >> 1);
    if (newCapacity - minCapacity < 0)
    newCapacity = minCapacity;
    if (newCapacity - MAX_ARRAY_SIZE > 0)
    newCapacity = hugeCapacity(minCapacity);
    // minCapacity is usually close to size, so this is a win:
    elementData = Arrays.copyOf(elementData, newCapacity);
    }
  • 线程不安全,所有方法没有加锁,多线程存在并发问题
  • 因为底层实现是数组,所以随机读取速度很快,并不是说不适合插入数据,而是不适合在中间或者头部插入数据。因为插入数据之后,当前位置后面的元素都要往后移动,成本相对来说比较大了。代码如下:
    1
    2
    System.arraycopy(elementData, index, elementData, index + 1, size - index);
    elementData[index] = element;
  • 删除操作也是同理,在末尾操作其实影响不大,但是在中间和数组起始位置操作成本就有点高了。
  • 更新操作影响很小,直接找到对应数组下标,然后替换就可以了。
  • indexOf和contains以及lastIndexOf方法都是直接进行遍历,因为数组是无序的,也没办法采用二分法之类的进行快速查找。所以尽可能不要直接使用List的查找方法。
  • size方法成本不高,并不是每次都进行统计,而是内部存储了一个size变量,每次增删操作回进行更新。
  • 暂时想到的就这么多,以后想起来再补充。

Vector

这个可是个老古董了,从JDK1.0开始就存在了(别问我怎么知道的,那会我也没用过Java,是文档自己写的。。。。),正因为是老古董,所以现在已经不是很推荐使用了,原因就是效率很低下,因为很多方法都暴力的增加了synchronized关键字,性能很低下。做个简单总结吧:

  • 因为很多方法都加上了synchronized关键词,导致整体性能较差,不推荐使用
  • 底层实现也是数组,特性和ArrayList差不多。
  • 注意:Vector没有实现Serializable接口
  • 关于扩容:ArrayList每次扩容是1.5倍,Vector是2倍。代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    private void grow(int minCapacity) {
    // overflow-conscious code
    int oldCapacity = elementData.length;
    int newCapacity = oldCapacity + ((capacityIncrement > 0) ?
    capacityIncrement : oldCapacity);//扩充一倍
    if (newCapacity - minCapacity < 0)
    newCapacity = minCapacity;
    if (newCapacity - MAX_ARRAY_SIZE > 0)
    newCapacity = hugeCapacity(minCapacity);
    elementData = Arrays.copyOf(elementData, newCapacity);
    }
    capacityIncrement是构造方法传进来的,如果不指定,则传0。

Stack

这个类已经快被遗忘了,简单概述一下。

  • 1.0时代的远古产物,继承了Vector,所以也是各种synchronized关键字,性能低下
  • 官方注释已经不推荐使用了,同样的功能可以使用Deque实现
    1
    Deque<Integer> stack = new ArrayDeque<Integer>();

LinkedList

你以为LinkedList仅仅是一个链表实现的List的么??那你就是图样图森破了,来看看强大的LinkedList吧。

  • 看一下接口层面:List、deque和Queue,也就是说LinkedList不仅仅是一个list集合,同时也是一个双向队列,当然也可以作为单向队列来使用。所以根据场景,上面的Stack也可以使用LinkedList进行替换
  • 底层的实现是基于链表,基本存储数据的元素是Node,代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    private static class Node<E> {
    E item;
    Node<E> next;
    Node<E> prev;

    Node(Node<E> prev, E element, Node<E> next) {
    this.item = element;
    this.next = next;
    this.prev = prev;
    }
    }
  • 正是因为基于链表实现,所以理论上在任意位置进行增删操作都是O(1)的时间复杂度,但是为什么我说是理论上呢?因为实际进行增删的时候,必须要先找到对应的位置吧?这个查找的过程时间复杂度可就是O(n)了。
  • LinkedList同样没有加任何同步措施,因此也是线程不安全的。
  • 说一个坑点:千万不要在for循环中通过索引的方式去获取元素(之前实习生干了这个事。。。),因为链表的通过索引方式进行随机读取的时间复杂度是O(n)!

简单汇总一下

ArrayList LinkedList Vector Stack
实现方式 数组 链表 数组 数组
线程安全
优势 适合随机读,末尾写 适合随机写,顺序读 线程安全 线程安全
劣势 不适合随机写入 不适合随机读取 性能差 性能差
扩容 1.5 - 2 -

Set

上面简单总结了一下List相关实现,接下来看看狐假虎威的Set。为啥说Set是狐假虎威呢?看看对应的实现类就明白了,基本上都是Map套了个壳(Map稍后总结)

HashSet

基于Hash实现的Set,内层实现是HashMap,所有的操作都是HashMap的Key的操作,而Map的实际Value则是一个Object对象。总结一下特点:

  • 判断是否存在速度很快,基于Hash实现如果不出现冲突,基本都是O(1)的操作。
  • 线程不安全,需要自己实现线程同步方式
  • 元素唯一(前提重写HashCode和equals方法,只有两者都相同才被认为是同一个元素)
  • 不保证顺序,因为基于Hash实现,无法保证读取时候的顺序(也就是通过迭代器方式读取无法保证顺序,不过貌似重写HashCode之后,可以在一定程度上控制顺序,但是最好不要这么用。。)
  • 因为底层实现是HashSet,所以也存在初始容量和负载因子,因此使用的时候如果事先知道存储大小,最好指定一下大小和负载因子,减少扩容消耗。
  • 因为HashSet支持null作为key,所以HashSet也可以存储null元素

LinkedHashSet

LinkedHashSet也是一个壳,继承了HashSet,注意一下HashSet内部还有一个带有boolean的构造方法,调用这种构造方法,则内部实现不再是HashMap,而是LinkedHashMap。

  • LinkedHashSet和HashSet最大的区别就是能保证元素插入的顺序和通过迭代器读取的顺序是一致的(但是不是经过排序的,是保留插入的顺序)。
  • 除了有序这个之外,其他特点和HashSet一样,因为继承了嘛(写这个类的人,真的是懒到极致的。。向他学习!)

TreeSet

看到TreeSet是不是立即想到了TreeMap?对的,TreeSet内部就是一个NavigableMap,NavigableMap又是什么?java.util包下原生的实现且暴露出来的好像只有。。。。TreeMap。。。

  • 内部实现是TreeMap,也就是基于红黑树(啥是红黑树?。。。自行百度。。),所以整体操作复杂度事O(log(n)),表面上看不如HashSet的O(1)速度快,但是一旦出现大量Hash冲突的时候,HashSet性能将急剧下降,因为冲突导致查询变为链表遍历(好像1.8还是1.7开始,冲突元素个数增加到8就会进行树化,防止链表过长),而TreeSet不会存在这个问题。
  • TreeSet实现了NavigableSet接口和SortedSet接口,也就是说TreeSet中的元素是有序的,同时是支持范围查询,查找大于或者小于某个元素的元素或者集合(具体看NavigableSet接口),这些都是HashSet无法提供的。
  • 线程不安全,补充:因为红黑树实现复杂,并发粒度控制困难(应该是这个原因),官方没有提供TreeSet对应的并发类,而是提供了基于跳表实现的并发类(后面再说)
  • 其他想到了再补充。。

EnumSet

EnumSet是一个抽象类,有两个实现:

  • RegularEnumSet
  • JumboEnumSet

注意一下,这两个类都是不对外暴露的,对外统一暴露的是EnumSet。这两个类有啥区别呢?RegularEnumSet存储的是元素个数小于等于64个,JumboEnumSet则是超过64个。
为啥要单独出来一个EnumSet呢?HashSet,TreeSet也是可以存储枚举的啊,查了一堆资料(实际上我也没用过这玩意。。),总结如下:

  • EnumSet的速度很快,原因是底层用了elements进行位运算,也就是说EnumSet并不直接存放枚举对象,而是存储一个对应类和elements,通过位运算来判断Set中有哪些元素,速度自然要快得多。
  • 一旦元素的枚举类型确定那么集合就确定了(因为要通过枚举类型进行位判断,如果更换了枚举类型,会导致结果出错,所以不允许修改)
  • EnumSet只能存放一种枚举类型的元素(原因同上)

Queue

一个先入先出的数据结构,util包下实现好像只有下面3个,这个主要在juc包下实现类较多(各种阻塞队列)

LinkedList

前面已经说过,不再多说了。

ArrayDeque

和LinkedList相比,最大不同就是底层实现是依赖于一个数组,简单汇总一下其特点:

  • 实现依赖于一个循环数组
  • 扩容: 扩容直接将容量翻倍,然后执行数组拷贝
  • 容量:要求必须是2的幂次方(方便进行位移运算)
  • 优势:和LinkedList相比,无需用Node对数据进行包裹,而且数组通过下标访问速度很快
  • 应用场景:额。。。其实我也没怎么用过,感觉常用栈和队列都可以用这个实现(好吧,以前我都是用LinkedList实现栈的操作。。。)

PriorityQueue

这个感觉平时用的也很少,是一个带有优先级的队列(并发包中的优先队列貌似使用场景更多一些。。),这个研究不多,直接当个搬运工吧(参考:https://www.cnblogs.com/mfrank/p/9614520.html)

  • 内部是根据小顶堆的结构进行存储的
  • 构造方法需要传入一个比较器,用于判断优先级
  • 内部实际上也是使用一个数组进行数据存储,同时有一个heapify()方法,用于将数组进行堆化(具体过程就不描述了。。。)
  • 应用场景,基本上就是堆的应用场景,比如寻找topN之类的

顺便肯一下另外一组容器

Map

Map我的理解就是存储键值对的容器,基本上每一种开发语言都有这种容器,比如Python,C#的字典,golang的map,应该说Map是和数组一个级别的重要容器了。最常用的应该是基于Hash实现的HashMap,当然还有基于红黑树的TreeMap。先看一下Map相关的类图:
Java集合类
简单总结一下:

HashMap

最常用的Map,没有之一(至少我工作这两年看到的Map,九成以上都是HashMap),应该也是面试必问容器,后面估计要专门整理一篇HashMap的总结了(网上各种总结已经一大把了。。),简单总结一下特点:

  • 基于hash的方法,能够快速通过key找到对应的value
  • 内部存储数据是基于数组,Node<K,V>[] table;
  • 线程不安全(几乎面试都会问到,然后就自然转到了juc的并发包了)
  • Key建议使用字符串,当然用自定义对象也可以,但是要重写hashcode和equals方法,否则不保证正确性了。
  • hash冲突的解决是通过链表方式,链表长度超过8以后,转为红黑树,当长度减少到6一下,再次转换为链表。(原因是怕链表长度过长,导致查询速度过慢,而冲突变少之后使用链表和树速度差别小,但是复杂度来看,链表要简单。。好吧,也是强行解释)
  • 迭代遍历不保证顺序
  • 允许null作为key和value

Hashtable

远古产物,并且类命名还不对,正确命名应该是HashTable,估计是当时开发人员粗心,写成了Hashtable,然后为了兼容性,那就错着把。。。功能上和HashMap基本一样,简单总结一下:

  • 线程安全,但是性能低下,全部基于synchronized关键词实现。
  • 不允许null作为key和value

LinkedHashMap

  • 与HashMap相比,保留的key的插入顺序性,遍历的时候和插入的顺序一致
  • 原理是内部维护了一条双向链表,记录插入的顺序
  • 额外增加了空间和时间上的开销
  • 应用场景
    • 保留插入顺序的遍历场景
    • LRU缓存的实现(可以看一下MyBatis的缓存实现,其中就有基于LinkedHashMap的LRU缓存)

TreeMap

这个因为红黑树实现,有点复杂(面试在单独复习红黑树吧。。),所以就不管内部具体实现了,总结一下特点

  • 线程不安全,即使是在并发包中也没有TreeMap的并发类
  • 实现了SortedMap接口,说明Key是有序的
  • 遍历的时候根据Key的自然顺序进行,或者指定Comparator比较器
  • 实现了NavigableMap接口,也就是说支持区间范围或者比大小操作(基于Key的)
  • 整体操作复杂度均为O(log(n))

EnumMap

针对枚举类作为Key的情形进行优化的Map,内部通过数组存储,查找的时候直接通过枚举的ordinal作为index快速查询。

  • 只能支持单一类型枚举

IdentityHashMap

陌生么?陌生。。。陌生就对了,因为日常开发中,压根就不会用到这玩意。。这玩意干嘛用的,它实际上是严格版本的HashMap,有多严格?引用必须相等!

HashMap中判断key相等的依据是key.equals(otherKey),而IdentityHashMap判断key相等的依据是key==otherKey,这种严格的限制,恕我无知。。我实在是找不到应用场景。。关键这个类还是大神Doug Lea写的。。。大神的思维。。不懂。。不懂。。

WeakHashMap

这个容器使用之前最好先了解一下Java中的引用(强软弱虚),WeakHashMap是一种弱key实现的容器,使用场景主要还是缓存吧(反正我没用过。。。),说一下特点

  • 当key被GC回收后,对应Map中的KeyValue対也会被回收,附代码示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public static void main(String[] args) throws InterruptedException {
    WeakHashMap<String, Object> map = new WeakHashMap<>();
    String k1 = new String("k1"); //注意一定要使用new String("xxx"),形式
    String k2 = new String("k2");
    String k3 = new String("k3");
    map.put(k1,new Object());
    map.put(k2,new Object());
    map.put(k3,new Object());
    System.out.println(map);
    System.gc();
    Thread.sleep(500);
    System.out.println(map);
    k1 = null;
    k2 = null;
    k3 = null;
    System.out.println("Key=null -> " +map);
    System.gc();
    Thread.sleep(500);
    System.out.println("After GC -> " +map);
    }

Properties

以前我还真不知道Properties竟然也是Map的实现类,内部主要是各种读取配置文件相关逻辑,存储方面由于继承了Hashtable,所以也是线程安全的,关于这个就不分析啥了。。

总结

糊里糊涂整理了一下java.util包下面的集合相关类(容器类也行。。),发现了几个平时开发中没用过的容器,但是其实是都可以用的。。。比如ArrayDeque,比如Enum相关Set和Map(恕我无知,之前真的都是通过HashSet和HashMap实现的。。。)。等后续有时间了,整理一下并发包下面的容器(好像已经烂大街了。。。)

static方法

public static CompletableFuture supplyAsync(Supplier supplier)

  • 提交一个Supplier任务,异步执行,可以获取任务返回结果,使用ForkJoinPool.commonPool()执行任务。

public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

  • 提交一个Supplier任务,异步执行,可以获取任务返回结果,使用指定的线程池执行

public static CompletableFuture runAsync(Runnable runnable)

  • 提交一个Runnable任务,异步执行,无返回结果,使用ForkJoinPool.commonPool()执行任务。

public static CompletableFuture runAsync(Runnable runnable, Executor executor)

  • 提交一个Supplier任务,异步执行,无返回结果,使用指定的线程池执行

public static CompletableFuture completedFuture(U value)

  • 新建一个完成的CompletableFuture,通常作为计算的起点阶段。

public static CompletableFuture allOf(CompletableFuture<?>… cfs)

  • 接收一个由CompletableFuture 构成的数组,需要等待多个 CompletableFuture 对象执行完毕,执行join操作可以等待CompletableFuture执行完成。

public static CompletableFuture anyOf(CompletableFuture<?>… cfs)
  • 接收一个由CompletableFuture 构成的数组,返回由第一个执行完毕的 CompletableFuture 对象的返回值构成的CompletableFuture

    示例代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    public class ThreadTest1 {
    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService pool = Executors.newFixedThreadPool(4);
    //创建一个直接完成的CompletableFuture
    String now = CompletableFuture.completedFuture("Test")
    .getNow("Fail");
    println("completedFuture: " + now);
    //创建一个带有返回值的CompletableFuture
    CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
    sleep(100);
    return "Test";
    }, pool);
    //延迟100ms,这里获取是default值
    now = task.getNow("Fail");
    println("supplyAsync: now: " + now);
    //等待任务完成后输出
    println("supplyAsync: get: " + task.get());
    System.out.println("---------------------分割线----------------------");
    //耗时100ms的任务
    CompletableFuture<Void> task100 = CompletableFuture.runAsync(() -> {
    sleep(100);
    println("runAsync :" + Thread.currentThread().getName() + " task100 done");
    }, pool);
    //耗时200ms的任务
    CompletableFuture<Void> task200 = CompletableFuture.runAsync(() -> {
    sleep(200);
    println("runAsync :" + Thread.currentThread().getName() + " task200 done");
    }, pool);
    //任意一个完成就会继续执行
    CompletableFuture.anyOf(task100, task200).join();
    println("anyOf Done");
    //全部完成才会继续执行
    CompletableFuture.allOf(task100, task200).join();
    println("allOf Done");
    //关闭线程池
    pool.shutdown();
    }

    private static void sleep(long time) {
    try {
    Thread.sleep(time);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    private static void println(Object object){
    System.out.println(sdf.format(new Date()) + ": " + object);
    }
    }

    运行结果

    1
    2
    3
    4
    5
    6
    7
    8
    2019-06-23 18:02:03.552: completedFuture: Test
    2019-06-23 18:02:03.604: supplyAsync: now: Fail
    2019-06-23 18:02:03.710: supplyAsync: get: Test
    -------------------------------------------
    2019-06-23 18:02:03.813: runAsync :pool-1-thread-2 task100 done
    2019-06-23 18:02:03.813: anyOf Done
    2019-06-23 18:02:04.012: runAsync :pool-1-thread-3 task200 done
    2019-06-23 18:02:04.012: allOf Done

    实例方法

    • 实例方法整体比较规则,一个标准执行方法,一个异步执行方法,一个指定异步线程执行方法

    thenApply

    • then是指在当前阶段正常执行完成后(正常执行是指没有抛出异常)进行的操作。Apply是指将一个Function作用于之前阶段得出的结果(即将上一步的结果进行转换)
    • public CompletableFuture (Function<? super T,? extends U> fn)
    • public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn)
    • public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      public static void thenApplyAsyncDemo(){
      Integer integer = CompletableFuture
      .completedFuture("task 1")
      .thenApplyAsync(x -> {
      String intString = x.split(" ")[1];
      return Integer.valueOf(intString);
      })
      .join();
      System.out.println("thenApplyAsyncDemo: " + integer);
      }
      --------- 输出 -----------
      thenApplyAsyncDemo: 1

    thenAccept

    • 下一个Stage接收了当前Stage的结果但是在计算中无需返回值(可以简单认为这里就是消费终点,因为没有返回值。当然下一步不依赖当前返回值的情况除外)
    • public CompletableFuture thenAccept(Consumer<? super T> action)
    • public CompletableFuture thenAcceptAsync(Consumer<? super T> action)
    • public CompletableFuture thenAcceptAsync(Consumer<? super T> action,Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      public static void thenAcceptDemo(){
      CompletableFuture
      .completedFuture("task")
      .thenAcceptAsync(s -> {
      System.out.println("thenAcceptDemo:" + s);
      })
      .join();
      }
      --------- 输出 -----------
      thenAcceptDemo:task

    thenRun

    • 不再关心上一步运算的结果,直接进行下一步的运算
    • public CompletableFuture thenRun(Runnable action)
    • public CompletableFuture thenRunAsync(Runnable action)
    • public CompletableFuture thenRunAsync(Runnable action, Executor executor)
      1
      2
      3
      4
      5
      6
      7
      public static void thenRunDemo() {
      CompletableFuture.completedFuture("Task")
      .thenRun(() -> System.out.println("我不知道上面的参数,也不会继续往下传递值"))
      .join();
      }
      --------- 输出 -----------
      我不知道上面的参数,也不会继续往下传递值

    thenCombine

    • 结合前面两个Stage的结果,进行转化
    • public <U,V> CompletableFuture thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
    • public <U,V> CompletableFuture thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
    • public <U,V> CompletableFuture thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      public static void thenCombineDemo() {
      CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "task 1");
      CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "task 2");
      String result = task1.thenCombineAsync(task2, (t1, t2) -> t1 + " - "+ t2)
      .join();
      System.out.println(result);
      }
      --------- 输出 -----------
      task 1 - task 2

    thenAcceptBoth

    • 结合两个CompletionStage的结果,进行消耗,和thenCombine相比,只是少了返回值
    • public CompletableFuture thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
    • public CompletableFuture thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
    • public CompletableFuture thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      public static void thenAcceptBothDemo() {
      CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "task 1");
      CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "task 2");
      task1.thenAcceptBoth(task2, (t1, t2) -> System.out.println("thenAcceptBothDemo: " +t1 + " - " + t2))
      .join();
      }
      --------- 输出 -----------
      thenAcceptBothDemo: task 1 - task 2

    runAfterBoth

    • 在两个CompletionStage都运行完执行。
    • public CompletableFuture runAfterBoth(CompletionStage<?> other,Runnable action)
    • public CompletableFuture runAfterBothAsync(CompletionStage<?> other,Runnable action)
    • public CompletableFuture runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      public static void runAfterBothDemo() {
      CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {
      sleep(100);
      println("task1 Done");
      });
      CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> {
      sleep(200);
      println("task2 Done");
      });
      task1.runAfterBoth(task2, () -> println("Task 1 And Task 2 Both Done"))
      .join();
      }
      --------- 输出 -----------
      2019-06-23 19:22:08.498: task1 Done
      2019-06-23 19:22:08.595: task2 Done
      2019-06-23 19:22:08.596: Task 1 And Task 2 Both Done

    applyToEither

    • 在两个CompletionStage中选择计算快的,将其结果进行下一步的转化操作。
    • public CompletableFuture applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
    • public CompletableFuture applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
    • public CompletableFuture applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      public static void applyToEitherDemo() {
      CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
      sleep(10);
      return "task 1";
      });
      CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
      sleep(20);
      return "task 2";
      });
      Integer result = task1.applyToEither(task2, t -> Integer.valueOf(t.split(" ")[1]))
      .join();
      System.out.println("applyToEitherDemo:" + result);
      }
      --------- 输出 -----------
      applyToEitherDemo:1

    acceptEither

    • 在两个CompletionStage中选择计算快的,作为下一步计算的结果。
    • public CompletableFuture acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
    • public CompletableFuture acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
    • public CompletableFuture acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      public static void acceptEitherDemo() {
      CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
      sleep(10);
      return "task 1";
      });
      CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
      sleep(20);
      return "task 2";
      });
      task1.acceptEitherAsync(task2, t -> System.out.println("acceptEitherDemo:" +Integer.valueOf(t.split(" ")[1])))
      .join();
      }
      --------- 输出 -----------
      acceptEitherDemo:1

    runAfterEither

    • 两个CompletionStage,任何一个完成了都会执行下一步的操作。
    • public CompletableFuture runAfterEither(CompletionStage<?> other,Runnable action)
    • public CompletableFuture runAfterEitherAsync(CompletionStage<?> other,Runnable action)
    • public CompletableFuture runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      public static void runAfterEitherDemo() {
      CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {
      sleep(100);
      println("task1 Done");
      });
      CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> {
      sleep(200);
      println("task2 Done");
      });
      task1.runAfterEither(task2, () -> println("Task 1 Or Task 2 Done"))
      .join();
      sleep(100);
      }
      --------- 输出 -----------
      2019-06-23 19:24:27.644: task1 Done
      2019-06-23 19:24:27.645: Task 1 Or Task 2 Done
      2019-06-23 19:24:27.749: task2 Done

    thenCompose

    • 连接两个CompletableFuture,返回值是新的CompletableFuture
    • public CompletableFuture thenCompose(Function<? super T, ? extends CompletionStage> fn)
    • public CompletableFuture thenComposeAsync(Function<? super T, ? extends CompletionStage> fn)
    • public CompletableFuture thenComposeAsync(Function<? super T, ? extends CompletionStage> fn,Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      public static void thenComposeDemo() {
      String result = CompletableFuture.completedFuture("Start")
      .thenCompose(x -> CompletableFuture.supplyAsync(() -> {
      sleep(20);
      return x + " task 2";
      }))
      .thenCompose(x -> CompletableFuture.supplyAsync(() -> {
      sleep(10);
      return x + " task 1";
      }))
      .join();
      System.out.println("thenComposeDemo:" +result);
      }
      --------- 输出 -----------
      thenComposeDemo:Start task 2 task 1

    whenComplete

    • 当运行完成时,对结果的记录。
      • 正常执行,返回值。
      • 异常抛出造成程序的中断
    • 注意,内部线程出现异常会抛到外层,导致外层线程产生异常。
    • public CompletableFuture whenComplete(BiConsumer<? super T, ? super Throwable> action)
    • public CompletableFuture whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
    • public CompletableFuture whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      public static void whenCompleteDemo() {
      CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
      sleep(10);
      return "task 1";
      });
      CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
      sleep(20);
      throw new RuntimeException("task2 RuntimeException");
      });
      String task1res = task1.whenComplete((res, exp) -> {
      println("task1 res:" + res);
      println("task1 exp:" + exp);
      }).join();
      println(task1res);
      String task2res = task2.whenComplete((res, exp) -> {
      println("task2 res:" + res);
      println("task2 exp:" + exp.getMessage());
      }).join();
      println(task2res);
      }
      --------- 输出 -----------
      2019-06-23 19:50:03.429 ForkJoinPool.commonPool-worker-1: task1 res:task 1
      2019-06-23 19:50:03.430 ForkJoinPool.commonPool-worker-1: task1 exp:null
      2019-06-23 19:50:03.430 main: task 1
      2019-06-23 19:50:03.439 ForkJoinPool.commonPool-worker-2: task2 res:null
      2019-06-23 19:50:03.439 ForkJoinPool.commonPool-worker-2: task2 exp:java.lang.RuntimeException: task2 RuntimeException
      Exception in thread "main" java.util.concurrent.CompletionException: java.lang.RuntimeException: task2 RuntimeException
      at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
      at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
      at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
      at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
      at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
      at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
      at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
      at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
      Caused by: java.lang.RuntimeException: task2 RuntimeException
      at com.example.demo.ThreadTest.lambda$whenCompleteDemo$5(ThreadTest.java:44)
      at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
      ... 5 more

    handle

    • 运行完成时,对结果的处理。这里的完成时有两种情况,
      • 正常执行,返回值
      • 遇到异常抛出造成程序的中断。
    • 异常不会被抛到外层,不会造成外部线程因为异常中断
    • public CompletableFuture handle(BiFunction<? super T, Throwable, ? extends U> fn)
    • public CompletableFuture handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
    • public CompletableFuture handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      public static void handleDemo() {
      CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
      sleep(10);
      return "task 1";
      });
      CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
      sleep(20);
      throw new RuntimeException("task2 RuntimeException");
      });
      String task1res = task1.handle((res, exp) -> {
      println("task1 res:" + res);
      println("task1 exp:" + exp);
      return res;
      }).join();
      println(task1res);
      String task2res = task2.handle((res, exp) -> {
      println("task2 res:" + res);
      println("task2 exp:" + exp.getMessage());
      return res;
      }).join();
      println(task2res);
      }
      --------- 输出 -----------
      2019-06-23 19:50:53.542 ForkJoinPool.commonPool-worker-1: task1 res:task 1
      2019-06-23 19:50:53.543 ForkJoinPool.commonPool-worker-1: task1 exp:null
      2019-06-23 19:50:53.543 main: task 1
      2019-06-23 19:50:53.552 ForkJoinPool.commonPool-worker-2: task2 res:null
      2019-06-23 19:50:53.552 ForkJoinPool.commonPool-worker-2: task2 exp:java.lang.RuntimeException: task2 RuntimeException
      2019-06-23 19:50:53.552 main: null

    exceptionally

    • 异常处理逻辑,可以设置异常情况下的返回值
    • public CompletionStage exceptionally(Function<Throwable, ? extends T> fn)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      public static void exceptionallyDemo() {
      Object result = CompletableFuture.supplyAsync(() -> {
      sleep(20);
      throw new RuntimeException("task2 RuntimeException");
      }).exceptionally(e->{
      System.out.println("exceptionally: " +e);
      return e.getMessage();
      }).join();
      System.out.println("exceptionallyDemo: " +result);
      }
      --------- 输出 -----------
      exceptionally: java.util.concurrent.CompletionException: java.lang.RuntimeException: task2 RuntimeException
      exceptionallyDemo: java.lang.RuntimeException: task2 RuntimeException

    其他方法

    • public boolean isDone()

      • 是否已经完成(包括正常完成,异常完成,取消完成)
    • public T get() throws InterruptedException, ExecutionException

      • 阻塞方式获取结果
    • public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException

      • 带超时方式的阻塞方式获取结果
    • public T join()

      • 阻塞至任务完成。会抛出CompletionException(unchecked类型)
    • public T getNow(T valueIfAbsent)

      • 立即获取结果,如果任务没有执行完成,则返回valueIfAbsent
    • public boolean complete(T value)

      • 如果任务还没有执行完成,则用当前值去替换完成值,否则继续使用原始值。
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        private static void completeDemo() {
        println("complete start");
        CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
        println("runAsync start");
        sleep(2000);
        println("runAsync end");
        return "task run finish";
        });
        boolean complete = task.complete("finish");
        System.out.println("complete:"+complete);
        System.out.println("task:"+task.join());
        println("complete end");
        }
        --------- 输出 -----------
        2019-06-23 20:55:19.491 main: complete start
        complete:true
        task:finish
        2019-06-23 20:55:19.547 main: complete end
        ------------------------------------------------------------------------
        private static void completeDemo() {
        println("complete start");
        CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
        println("runAsync start");
        sleep(1);
        println("runAsync end");
        return "task run finish";
        });
        sleep(10);
        boolean complete = task.complete("finish");
        System.out.println("complete:"+complete);
        System.out.println("task:"+task.join());
        println("complete end");
        }
        --------- 输出 -----------
        2019-06-23 20:59:32.375 main: complete start
        2019-06-23 20:59:32.426 ForkJoinPool.commonPool-worker-1: runAsync start
        2019-06-23 20:59:32.428 ForkJoinPool.commonPool-worker-1: runAsync end
        complete:false
        task:task run finish
        2019-06-23 20:59:32.437 main: complete end
    • public boolean completeExceptionally(Throwable ex)

      • 如果任务还没有执行完成,则以异常的方式中断执行(调用join方法会抛出该异常),如果执行完成,则返回false,并正常执行
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
            private static void completeExceptionallyDemo() {
        println("completeExceptionally start");
        CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
        println("supplyAsync start");
        sleep(1);
        println("supplyAsync end");
        return "task run finish";
        });
        sleep(10);
        boolean complete = task.completeExceptionally(new NullPointerException("Test Null"));
        System.out.println("complete:"+complete);
        System.out.println("task:"+task.join());
        println("complete 1 end");

        task = CompletableFuture.supplyAsync(() -> {
        println("supplyAsync start");
        sleep(1);
        println("supplyAsync end");
        return "task run finish";
        });
        complete = task.completeExceptionally(new NullPointerException("Test Null"));
        System.out.println("complete:"+complete);
        System.out.println("task:"+task.join());
        }
        --------- 输出 -----------
        2019-06-23 21:11:48.276 main: completeExceptionally start
        2019-06-23 21:11:48.330 ForkJoinPool.commonPool-worker-1: supplyAsync start
        2019-06-23 21:11:48.331 ForkJoinPool.commonPool-worker-1: supplyAsync end
        complete:false
        task:task run finish
        2019-06-23 21:11:48.340 main: complete 1 end
        complete:true
        Exception in thread "main" java.util.concurrent.CompletionException: java.lang.NullPointerException: Test Null
        at java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375)
        at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1934)
        at com.example.demo.ThreadTest.completeExceptionallyDemo(ThreadTest.java:35)
        at com.example.demo.ThreadTest.main(ThreadTest.java:9)
        Caused by: java.lang.NullPointerException: Test Null
        at com.example.demo.ThreadTest.completeExceptionallyDemo(ThreadTest.java:33)
        ... 1 more
    • public CompletableFuture toCompletableFuture()

      • 返回CompletableFuture对象,实际代码中返回this
    • public boolean cancel(boolean mayInterruptIfRunning)

      • 取消任务,mayInterruptIfRunning在当前实现没有任何作用。。(醉了)
      • 任务取消后如果执行join方法会抛出CancellationException异常。
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        private static void cancelDemo() {
        CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
        @Override
        public void run() {
        println("Start");
        sleep(1000);
        println("End");

        }
        });
        sleep(10);
        boolean cancel = future.cancel(true);
        println(cancel);
        System.out.println("----------------------------------");
        future = CompletableFuture.runAsync(new Runnable() {
        @Override
        public void run() {
        println("Start");
        sleep(10);
        println("End");

        }
        });
        sleep(100);
        cancel = future.cancel(true);
        println(cancel);
        }
        --------- 输出 -----------
        2019-06-23 21:56:33.060 ForkJoinPool.commonPool-worker-1: Start
        2019-06-23 21:56:33.072 main: true
        ----------------------------------
        2019-06-23 21:56:33.075 ForkJoinPool.commonPool-worker-2: Start
        2019-06-23 21:56:33.086 ForkJoinPool.commonPool-worker-2: End
        2019-06-23 21:56:33.176 main: false
    • public boolean isCancelled()

      • 返回当前任务是否已经被取消
    • public boolean isCompletedExceptionally()

      • 返回当前任务是否异常方式中断
    • public void obtrudeValue(T value)

      • 将future的结果强制更改为value,无论是否发生异常
    • public void obtrudeException(Throwable ex)

      • 将future的结果强制更改为异常,只要调用get或者join均会抛出该异常,同时会修改isCompletedExceptionally的结果
    • public int getNumberOfDependents()

      • 返回有多少个后续stage依赖当前stage
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        private static void getNumberOfDependentsDemo() {
        CompletableFuture<String> t1 = CompletableFuture.supplyAsync(() -> {
        sleep(1000);
        return "1";
        });
        CompletableFuture<String> t2 = CompletableFuture.supplyAsync(() -> {
        sleep(1000);
        return "2";
        });
        sleep(10);
        println(t1.getNumberOfDependents());
        println(t2.getNumberOfDependents());
        CompletableFuture<Void> all = CompletableFuture.allOf(t1, t2);
        println(all.isDone());
        println(t1.getNumberOfDependents());
        println(t2.getNumberOfDependents());
        all.join();
        println(t1.getNumberOfDependents());
        println(t2.getNumberOfDependents());
        }
        --------- 输出 -----------
        2019-06-23 22:26:45.132 main: 0
        2019-06-23 22:26:45.133 main: 0
        2019-06-23 22:26:45.133 main: false
        2019-06-23 22:26:45.133 main: 1
        2019-06-23 22:26:45.133 main: 1
        2019-06-23 22:26:46.122 main: 0
        2019-06-23 22:26:46.122 main: 0

      参考


    select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。


    详解

    select

    • 基本原理:
      • select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。
      • 调用后select函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。
      • 当select函数返回后,可以通过遍历fdset,来找到就绪的描述符。
    • select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。
    • select最大的缺陷就是单个进程所打开的FD是有一定限制的,它由FD_SETSIZE设置,默认值是1024。
      • 一般来说这个数目和系统内存关系很大,具体数目可以cat /proc/sys/fs/file-max察看。32位机默认是1024个。64位机默认是2048.
    • 对socket进行扫描时是线性扫描,即采用轮询的方法,效率较低。
      • 当套接字比较多的时候,每次select()都要通过遍历FD_SETSIZE个Socket来完成调度,不管哪个Socket是活跃的,都遍历一遍。这会浪费很多CPU时间。如果能给套接字注册某个回调函数,当他们活跃时,自动完成相关操作,那就避免了轮询,这正是epoll与kqueue做的。
    • 需要维护一个用来存放大量fd的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大。
    • 简述:1024

    poll

    • 基本原理:
      • poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态。
      • 如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或者主动超时,被唤醒后它又要再次遍历fd。
      • 这个过程经历了多次无谓的遍历。
    • pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。
    • 同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。
    • 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。
    • poll还有一个特点是“水平触发”,如果报告了fd后,没有被处理,那么下次poll时会再次报告该fd。
    • 从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。
    • 事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。
    • 简述:鸡肋

    epoll

    • epoll是在2.6内核中提出的,是之前的select和poll的增强版本。
    • 相对于select和poll来说,epoll更加灵活,没有描述符限制。
    • epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。
    • 没有最大并发连接的限制,能打开的FD的上限远大于1024(1G的内存上能监听约10万个端口)。
    • 效率提升,不是轮询的方式,不会随着FD数目的增加效率下降。
      • 只有活跃可用的FD才会调用callback函数;即Epoll最大的优点就在于它只管你“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,Epoll的效率就会远远高于select和poll。
    • 内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递;即epoll使用mmap减少复制开销。
    • 简述:杀手锏

    epoll工作模式

    • epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:
      • LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。
      • ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。
    LT模式
    • LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的。
    ET模式
    • ET(edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)
    • ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。

    epoll总结

    • 在 select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一 个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait() 时便得到通知。(此处去掉了遍历文件描述符,而是通过监听回调的的机制。这正是epoll的魅力所在。)
    • epoll的优点主要是一下几个方面:
      • 监视的描述符数量不受限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左 右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。select的最大缺点就是进程打开的fd是有数量限制的。这对 于连接数量比较大的服务器来说根本不能满足。虽然也可以选择多进程的解决方案( Apache就是这样实现的),不过虽然linux上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。
      • IO的效率不会随着监视fd的数量的增长而下降。epoll不同于select和poll轮询的方式,而是通过每个fd定义的回调函数来实现的。只有就绪的fd才会执行回调函数。
      • 如果没有大量的idle -connection或者dead-connection,epoll的效率并不会比select/poll高很多,但是当遇到大量的idle- connection,就会发现epoll的效率大大高于select/poll。

    select、poll、epoll区别

    1、支持一个进程所能打开的最大连接数
    最大连接数
    2、FD剧增后带来的IO效率问题
    IO效率问题
    3、消息传递方式
    消息传递方式


    • 我们知道现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。

    • 操心系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核,保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。

    • 针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。

    • 每个进程可以通过系统调用进入内核,因此,Linux内核由系统内的所有进程共享。于是,从具体进程的角度来看,每个进程可以拥有4G字节的虚拟空间。

    • 空间分配如下图所示:
      空间分配图

    • 有了用户空间和内核空间,整个linux内部结构可以分为三部分,从最底层到最上层依次是:硬件–>内核空间–>用户空间。如下图所示:
      linux内部结构图

    • 需要注意的细节问题:

      • 内核空间中存放的是内核代码和数据,而进程的用户空间中存放的是用户程序的代码和数据。不管是内核空间还是用户空间,它们都处于虚拟空间中。
      • Linux使用两级保护机制:0级供内核使用,3级供用户程序使用。
    • 内核态与用户态:

      • 当一个任务(进程)执行系统调用而陷入内核代码中执行时,称进程处于内核运行态(内核态)。此时处理器处于特权级最高的(0级)内核代码中执行。当进程处于内核态时,执行的内核代码会使用当前进程的内核栈。每个进程都有自己的内核栈。
      • 当进程在执行用户自己的代码时,则称其处于用户运行态(用户态)。此时处理器在特权级最低的(3级)用户代码中运行。当正在执行用户程序而突然被中断程序中断时,此时用户程序也可以象征性地称为处于进程的内核态。因为中断处理程序将使用当前进程的内核栈。
    • 参考资料:

    核心组件

    Channel

    • Nio Channel类似于Java Stream,但又有几点不同
      • Channel是双向的,Stream是单向的
      • Channel可以非阻塞的进行读写操作,而Stream需要等待io操作完成,也就是阻塞的。
      • Channel的读操作或者写操作都是依赖Buffer的,Stream没有依赖

    ServerSocketChannel

    • Java NIO中的 ServerSocketChannel 是一个可以监听新进来的TCP连接的通道, 就像标准IO中的ServerSocket一样。
    • ServerSocketChannel类在 java.nio.channels包中。

    SocketChannel

    • Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道。
    • 可以通过以下2种方式创建SocketChannel:
      • 打开一个SocketChannel并连接到互联网上的某台服务器。
      • 一个新连接到达ServerSocketChannel时,会创建一个SocketChannel。
    • 非阻塞模式与选择器搭配会工作的更好,通过将一或多个SocketChannel注册到Selector,可以询问选择器哪个通道已经准备好了读取,写入等。

    DatagramChannel

    • Java NIO中的DatagramChannel是一个能收发UDP包的通道。
    • 因为UDP是无连接的网络协议,所以不能像其它通道那样读取和写入。
    • 它发送和接收的是数据包。

    FileChannel

    • Java NIO中的FileChannel是一个连接到文件的通道。可以通过文件通道读写文件。
    • FileChannel无法设置为非阻塞模式,它总是运行在阻塞模式下。

    Buffer

    • 一个 Buffer,本质上是内存中的一块,我们可以将数据写入这块内存,之后从这块内存获取数据。通过将这块内存封装成 NIO Buffer 对象,并提供了一组常用的方法,方便我们对该块内存的读写。
    • 基本属性
      • capacity
        • 属性,容量,Buffer 能容纳的数据元素的最大值。这一容量在 Buffer 创建时被赋值,并且永远不能被修改。
      • limit
        • 属性,上限。
        • 写模式下,代表最大能写入的数据上限位置,这个时候 limit 等于 capacity 。
        • 读模式下,在 Buffer 完成所有数据写入后,通过调用 #flip() 方法,切换到读模式。此时,limit 等于 Buffer 中实际的数据大小。因为 Buffer 不一定被写满,所以不能使用 capacity 作为实际的数据大小。
      • position
        • position 属性,位置,初始值为 0 。
        • 写模式下,每往 Buffer 中写入一个值,position 就自动加 1 ,代表下一次的写入位置。
        • 读模式下,每从 Buffer 中读取一个值,position 就自动加 1 ,代表下一次的读取位置。( 和写模式类似 )
      • mark
        • 属性,标记,通过 #mark() 方法,记录当前 position ;通过 reset() 方法,恢复 position 为标记。
        • 写模式下,标记上一次写位置。
        • 读模式下,标记上一次读位置。
      • 关系
        • mark <= position <= limit <= capacity
    • 创建Buffer
      • 每个 Buffer 实现类,都提供了 #allocate(int capacity) 静态方法,帮助我们快速实例化一个 Buffer 对象。
        • ByteBuffer 实际是个抽象类,返回的是它的基于堆内( Non-Direct )内存的实现类 HeapByteBuffer 的对象。
      • 每个 Buffer 实现类,都提供了 #wrap(array) 静态方法,帮助我们将其对应的数组包装成一个 Buffer 对象。
        • 和 #allocate(int capacity) 静态方法一样,返回的也是 HeapByteBuffer 的对象。
      • 每个 Buffer 实现类,都提供了 #allocateDirect(int capacity) 静态方法,帮助我们快速实例化一个 Buffer 对象。
        • 和 #allocate(int capacity) 静态方法不一样,返回的是它的基于堆外( Direct )内存的实现类 DirectByteBuffer 的对象。
    • 向 Buffer 写入数据
      • 每个 Buffer 实现类,都提供了 #put(…) 方法,向 Buffer 写入数据。
      • 对于 Buffer 来说,有一个非常重要的操作就是,我们要讲来自 Channel 的数据写入到 Buffer 中。
      • 在系统层面上,这个操作我们称为读操作,因为数据是从外部( 文件或者网络等 )读取到内存中。
      • 通常在说 NIO 的读操作的时候,我们说的是从 Channel 中读数据到 Buffer 中,对应的是对 Buffer 的写入操作
    • 从 Buffer 读取数据
      • 每个 Buffer 实现类,都提供了 #get(…) 方法,从 Buffer 读取数据。
      • 对于 Buffer 来说,还有一个非常重要的操作就是,我们要讲来向 Channel 的写入 Buffer 中的数据。
      • 在系统层面上,这个操作我们称为写操作,因为数据是从内存中写入到外部( 文件或者网络等 )。
    • rewind() flip() clear()
      • flip
        • 如果要读取 Buffer 中的数据,需要切换模式,从写模式切换到读模式。
      • rewind
        • 可以重置 position 的值为 0 。因此,我们可以重新读取和写入 Buffer 了。
        • 该方法主要针对于读模式,所以可以翻译为“倒带”。
      • clear
        • 可以“重置” Buffer 的数据。因此,我们可以重新读取和写入 Buffer 了。
        • 该方法主要针对于写模式。
        • Buffer 的数据实际并未清理掉
    • mark() 搭配 reset()
      • mark
        • 保存当前的 position 到 mark 中。
      • reset
        • 恢复当前的 postion 为 mark 。

    关于 Direct Buffer 和 Non-Direct Buffer 的区别

    • Direct Buffer:
      • 所分配的内存不在 JVM 堆上, 不受 GC 的管理.(但是 Direct Buffer 的 Java 对象是由 GC 管理的, 因此当发生 GC, 对象被回收时, Direct Buffer 也会被释放)
      • 因为 Direct Buffer 不在 JVM 堆上分配, 因此 Direct Buffer 对应用程序的内存占用的影响就不那么明显(实际上还是占用了这么多内存, 但是 JVM 不好统计到非 JVM 管理的内存.)
      • 申请和释放 Direct Buffer 的开销比较大. 因此正确的使用 Direct Buffer 的方式是在初始化时申请一个 Buffer, 然后不断复用此 buffer, 在程序结束后才释放此 buffer.
      • 使用 Direct Buffer 时, 当进行一些底层的系统 IO 操作时, 效率会比较高, 因为此时 JVM 不需要拷贝 buffer 中的内存到中间临时缓冲区中.
    • Non-Direct Buffer:
      • 直接在 JVM 堆上进行内存的分配, 本质上是 byte[] 数组的封装.
      • 因为 Non-Direct Buffer 在 JVM 堆中, 因此当进行操作系统底层 IO 操作中时, 会将此 buffer 的内存复制到中间临时缓冲区中. 因此 Non-Direct Buffer 的效率就较低.

    Selector

    • Selector , 一般称为选择器。它是 Java NIO 核心组件中的一个,用于轮询一个或多个 NIO Channel 的状态是否处于可读、可写。如此,一个线程就可以管理多个 Channel ,也就说可以管理多个网络连接。也因此,Selector 也被称为多路复用器。
    • 那么 Selector 是如何轮询的呢?
      • 首先,需要将 Channel 注册到 Selector 中,这样 Selector 才知道哪些 Channel 是它需要管理的。
      • 之后,Selector 会不断地轮询注册在其上的 Channel 。如果某个 Channel 上面发生了读或者写事件,这个 Channel 就处于就绪状态,会被 Selector 轮询出来,然后通过 SelectionKey 可以获取就绪 Channel 的集合,进行后续的 I/O 操作。
    • 优缺点
      • 优点
        • 使用一个线程能够处理多个 Channel 的优点是,只需要更少的线程来处理 Channel 。
        • 事实上,可以使用一个线程处理所有的 Channel 。
        • 对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源( 例如 CPU、内存 )。因此,使用的线程越少越好。
      • 缺点
        • 因为在一个线程中使用了多个 Channel ,因此会造成每个 Channel 处理效率的降低。
    • 创建 Selector
      • 通过 #open() 方法,我们可以创建一个 Selector 对象。代码如下:
    • 注册 Chanel 到 Selector 中
      • 为了让 Selector 能够管理 Channel ,我们需要将 Channel 注册到 Selector 中。
      • 如果一个 Channel 要注册到 Selector 中,那么该 Channel 必须是非阻塞。
      • FileChannel 是不能够注册到 Channel 中的,因为它是阻塞的。
      • 监听四种不同类型的事件:
        • Connect :连接完成事件( TCP 连接 ),仅适用于客户端,对应 SelectionKey.OP_CONNECT 。
        • Accept :接受新连接事件,仅适用于服务端,对应 SelectionKey.OP_ACCEPT 。
        • Read :读事件,适用于两端,对应 SelectionKey.OP_READ ,表示 Buffer 可读。
        • Write :写时间,适用于两端,对应 SelectionKey.OP_WRITE ,表示 Buffer 可写。
      • Channel 触发了一个事件,意思是该事件已经就绪:
        • 一个 Client Channel Channel 成功连接到另一个服务器,称为“连接就绪”。
        • 一个 Server Socket Channel 准备好接收新进入的连接,称为“接收就绪”。
        • 一个有数据可读的 Channel ,可以说是“读就绪”。
        • 一个等待写数据的 Channel ,可以说是“写就绪”。
    • SelectionKey 类
      • 调用 Channel 的 #register(…) 方法,向 Selector 注册一个 Channel 后,会返回一个 SelectionKey 对象。
      • SelectionKey 在 java.nio.channels 包下,被定义成一个抽象类,表示一个 Channel 和一个 Selector 的注册关系。
      • 注册关系,包含如下内容:
        • interest set: 感兴趣的事件集合。
        • ready set :就绪的事件集合。
        • Channel
        • Selector
        • attachment :可选的附加对象。可以向 SelectionKey 添加附加对象。
    • 通过 Selector 选择 Channel
      • 在 Selector 中,提供三种类型的选择( select )方法,返回当前有感兴趣事件准备就绪的 Channel 数量。
        • select() 阻塞到至少有一个 Channel 在你注册的事件上就绪了。
        • select(long timeout) 在 #select() 方法的基础上,增加超时机制。
        • selectNow() 和 #select() 方法不同,立即返回数量,而不阻塞。
      • select 方法返回的 int 值,表示有多少 Channel 已经就绪。也就是自上次调用 select 方法后有多少 Channel 变成就绪状态。
    • 获取可操作的 Channel
      • 一旦调用了 select 方法,并且返回值表明有一个或更多个 Channel 就绪了,然后可以通过调用Selector 的 #selectedKeys() 方法,访问“已选择键集( selected key set )”中的就绪 Channel 。
      • 注意,当有新增就绪的 Channel ,需要先调用 select 方法,才会添加到“已选择键集( selected key set )”中。否则,我们直接调用 #selectedKeys() 方法,是无法获得它们对应的 SelectionKey 们。
    • 唤醒 Selector 选择
      • 某个线程调用 #select() 方法后,发生阻塞了,即使没有通道已经就绪,也有办法让其从 #select() 方法返回。
      • 只要让其它线程在第一个线程调用 select() 方法的那个 Selector 对象上,调用该 Selector 的 #wakeup() 方法,进行唤醒该 Selector 即可。
      • 注意,如果有其它线程调用了 #wakeup() 方法,但当前没有线程阻塞在 #select() 方法上,下个调用 #select() 方法的线程会立即被唤醒。
    • 关闭 Selector
      • 当我们不再使用 Selector 时,可以调用 Selector 的 #close() 方法,将它进行关闭。
        • Selector 相关的所有 SelectionKey 都会失效。
        • Selector 相关的所有 Channel 并不会关闭。
      • 此时若有线程阻塞在 #select() 方法上,也会被唤醒返回。

    NIO与BIO相比

    NIO

    • 基于缓冲区
      • 基于Buffer读取,将数据从Channel中读取到Buffer中,或者从buffer中将数据写回到channel中。因为数据已经读取到缓冲区当中,所以操作不需要顺序执行,增加其灵活性。
    • 非阻塞IO
      • 一个线程从channel中执行io操作的时候,无论是读取还是写入,都无需等待完成,都会直接返回,不会阻塞当前正在执行的线程。
    • 有选择器
      • 一个线程可以通过一个Selector管理多个Channel,选择器是实现非阻塞io的核心。
      • Selector内部自动为我们实现了轮训select操作,判断channel是否有已经就绪的io事件(连接,读,写等)

    BIO

    • 基于流(Stream)
      • 以流式方式进行处理,顺序的从一个stream中读取一个或者多个字节,直到读取完成。由于没有缓存区,不能随意更改读取指针的位置。
    • 阻塞IO
      • 一个线程操作io的时候,该线程会被阻塞,直到数据被读取或者写入完成。

    根据加锁的范围,MySQL里面的锁大致可以分为全局锁,表级锁和行级锁。

    全局锁

    • MySql提供了一个加全局锁的方法,Flush tables with read lock(FTWRL)
    • 适用场景:全库逻辑备份
    • 阻塞: 数据更新语句,数据定义语句,更新类事务提交语句

    表级锁

    • MySql中表级锁有两种:表锁 和 元数据锁

    表锁

    • 表锁语法是: lock tables … read/write
    • 对于InnoDB这种支持行锁的引擎,一般不使用lock tables方式控制并发

    元数据锁(matedata lock,MDL)

    • MDL不需要显示使用,在访问一个表的时候会被自动加上。
    • 作用:保证读写的正确性。
    • 增删改查操作自动加MDL读锁,修改表结构的时候加MDL写锁

    幻读

    • 问题: 即使把所有的记录都加上锁,也还是阻止不了新插入的记录。

    如何解决幻读问题

    • 产生幻读的原因是:行锁只能锁住行,新插入记录这个动作,要更新的是记录之间的间隙。
    • 解决办法: InnoDB引入了新的锁,间隙锁(Gap Lock)
    • 与间隙锁冲突的操作:往这个间隙中插入一条记录
    • 间隙锁和行锁合称:next-key lock

    加锁总结

    • 原则1:加锁的基本单位是next-key lock,是前开后闭区间。

    • 原则2:查找过程中访问到的对象才会加锁。

    • 优化1:索引上的等值查询,给唯一索引加锁的时候,next-key lock 退化为行锁。

    • 优化2:索引上的等值查询,向右遍历时且最后一个值不满足等值条件的时候,next-key lock退化为间隙锁。

    • Bug:唯一索引上的范围查询会访问到不满足条件的第一个值为止。

    • 锁是加在索引上的

    • 用lock in share mode来给行加读锁避免数据被更新的话,必须要绕过覆盖索引优化,查询字段中加入索引中不存在的字段。

    • 分析加锁规则的时候可以用next-key lock来进行分析,但是具体执行的时候,是要分成间隙锁和行锁两阶段来执行的。


    • 源:<极客时间> MySQL实战45讲教程

    索引

    索引常见模型

    • 哈希表
    • 适用于只有等值查询的场景
    • 有序数组
    • 适用于等值查询,范围查询
    • 更新成本高,适用于静态存储引擎
    • 搜索树
    • 查询复杂度O(log(N))
    • 更新操作复杂度O(log(N))
    • 为了适配磁盘,往往使用N叉树

    InnoDB索引模型

    • 表都是根据主键顺序以索引形式存放。

    • 使用了B+树索引模型,数据存储在B+树中。

    • 根据叶子节点内容,索引类型分主键索引和非主键索引

    • 主键索引叶子节点存放的是整行的数据,也成为聚簇索引。

    • 非主键索引叶子节点存放的是主键的值,非主键索引也成为二级索引

    • 区别:基于非主键索引的查询需要多扫描一颗索引树。

    索引维护

    • 一个数据页满了,按照B+Tree算法,会新增加一个数据页,这个过程称为页分裂,会导致性能下降,空间利用率降低大概一半。
    • 两个相邻的数据页利用率如果都很低,会做数据合并,也就是页分裂逆过程
    • B+树的插入可能会引起数据页的分裂,删除可能会引起数据页的合并,二者都是比较重的IO消耗,所以比较好的方式是顺序插入数据,这也是我们一般使用自增主键的原因之一
    • 在Key-Value的场景下,只有一个索引且是唯一索引,则适合直接使用业务字段作为主键索引
    • 非主键索引的叶子结点存储的是主键的值,所以主键字段占用空间不宜过大。同时,其查找数据的过程称为“回表”,需要先查找自己得到主键值,再在主键索引上边查找数据内容。
    • 索引的实现由存储引擎来决定,InnoDB使用B+树(N叉树,比如1200叉树),把整颗树的高度维持在很小的范围内,同时在内存里缓存前面若干层的节点,可以极大地降低访问磁盘的次数,提高读的效率。

    覆盖索引

    • 回到主键索引树搜索的过程,我们称为回表
    • 由于覆盖索引可以减少树的搜索次数,显著提升查询性能,所以使用覆盖索引是一个常用的性能优化手段。

    最左前缀原则

    • B+ 树这种索引结构,可以利用索引的“最左前缀”,来定位记录。
    • 在建立联合索引的时候,如何安排索引内的字段顺序
    • 第一原则,如果通过调整顺序,可以少维护一个索引,那么这个顺序往往就是最需要有限考虑的。
    • 再次考虑空间

    索引下推

    • MySQL 5.6 引入的索引下推优化,可以在索引遍历过程中,对索引中包含的字段先做判断,直接过滤掉不满足条件的记录,减少回表次数。

    小结

    • 满足语句需求的情况下, 尽量少地访问资源是数据库设计的重要原则之一。
    • 设计表结构时,也要以减少资源消耗作为目标。

    • 源:<极客时间> MySQL实战45讲教程

    事务隔离级别

    问题

    • 脏读
    • 可重复读
    • 幻读

    隔离级别

    • 读未提交
    • 一个事务还没提交的时候,其所作的变更可以被其他事务看到。
    • 读提交
    • 一个事务提交后,他做的变更才会被其他事务看到。
    • 可重复读
    • 一个事务执行的过程中看到的数据,总是跟这个事务在启动时看到的数据是一致的。
    • 串行化
    • 对于同一行记录,写会加写锁,读会加读锁,一旦出现读写锁冲突的时候,后访问的事务必须等前一个事务完成才能继续执行。

    隔离级别实现

    • 数据库里面会创建一个视图,访问的时候以这个视图的逻辑结果为准。
    • 可重复读,视图在事务启动时创建,这个事务存在期间都在用这个视图。
    • 读提交,视图实在每个sql语句开始执行的时候创建
    • 读未提交,直接返回记录最新值,没有视图概念
    • 串行化,直接用加锁的方式避免并行访问
    • 隔离的实现
    • 每条记录在更新的时候都会同事记录一条回滚操作。记录上的最新值,通过回滚操作都可以得到前一个状态的值。
    • 系统会在没有实物需要使用到这些回滚日志的时候,删除回滚日志。
    • 不要使用长事务
    • 长事务意味着系统里面会存在很老的事务视图。由于这些事务随时可能访问数据库里面的任何数据,所以这个事务提交之前,数据库里面会存储他可能用到的所有回滚记录,导致占用大量的存储空

    • 源:<极客时间> MySQL实战45讲教程