0%

CompletableFuture 学习汇总

static方法

public static CompletableFuture supplyAsync(Supplier supplier)

  • 提交一个Supplier任务,异步执行,可以获取任务返回结果,使用ForkJoinPool.commonPool()执行任务。

public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

  • 提交一个Supplier任务,异步执行,可以获取任务返回结果,使用指定的线程池执行

public static CompletableFuture runAsync(Runnable runnable)

  • 提交一个Runnable任务,异步执行,无返回结果,使用ForkJoinPool.commonPool()执行任务。

public static CompletableFuture runAsync(Runnable runnable, Executor executor)

  • 提交一个Supplier任务,异步执行,无返回结果,使用指定的线程池执行

public static CompletableFuture completedFuture(U value)

  • 新建一个完成的CompletableFuture,通常作为计算的起点阶段。

public static CompletableFuture allOf(CompletableFuture<?>… cfs)

  • 接收一个由CompletableFuture 构成的数组,需要等待多个 CompletableFuture 对象执行完毕,执行join操作可以等待CompletableFuture执行完成。

public static CompletableFuture anyOf(CompletableFuture<?>… cfs)
  • 接收一个由CompletableFuture 构成的数组,返回由第一个执行完毕的 CompletableFuture 对象的返回值构成的CompletableFuture

    示例代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    public class ThreadTest1 {
    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService pool = Executors.newFixedThreadPool(4);
    //创建一个直接完成的CompletableFuture
    String now = CompletableFuture.completedFuture("Test")
    .getNow("Fail");
    println("completedFuture: " + now);
    //创建一个带有返回值的CompletableFuture
    CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
    sleep(100);
    return "Test";
    }, pool);
    //延迟100ms,这里获取是default值
    now = task.getNow("Fail");
    println("supplyAsync: now: " + now);
    //等待任务完成后输出
    println("supplyAsync: get: " + task.get());
    System.out.println("---------------------分割线----------------------");
    //耗时100ms的任务
    CompletableFuture<Void> task100 = CompletableFuture.runAsync(() -> {
    sleep(100);
    println("runAsync :" + Thread.currentThread().getName() + " task100 done");
    }, pool);
    //耗时200ms的任务
    CompletableFuture<Void> task200 = CompletableFuture.runAsync(() -> {
    sleep(200);
    println("runAsync :" + Thread.currentThread().getName() + " task200 done");
    }, pool);
    //任意一个完成就会继续执行
    CompletableFuture.anyOf(task100, task200).join();
    println("anyOf Done");
    //全部完成才会继续执行
    CompletableFuture.allOf(task100, task200).join();
    println("allOf Done");
    //关闭线程池
    pool.shutdown();
    }

    private static void sleep(long time) {
    try {
    Thread.sleep(time);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    private static void println(Object object){
    System.out.println(sdf.format(new Date()) + ": " + object);
    }
    }

    运行结果

    1
    2
    3
    4
    5
    6
    7
    8
    2019-06-23 18:02:03.552: completedFuture: Test
    2019-06-23 18:02:03.604: supplyAsync: now: Fail
    2019-06-23 18:02:03.710: supplyAsync: get: Test
    -------------------------------------------
    2019-06-23 18:02:03.813: runAsync :pool-1-thread-2 task100 done
    2019-06-23 18:02:03.813: anyOf Done
    2019-06-23 18:02:04.012: runAsync :pool-1-thread-3 task200 done
    2019-06-23 18:02:04.012: allOf Done

    实例方法

    • 实例方法整体比较规则,一个标准执行方法,一个异步执行方法,一个指定异步线程执行方法

    thenApply

    • then是指在当前阶段正常执行完成后(正常执行是指没有抛出异常)进行的操作。Apply是指将一个Function作用于之前阶段得出的结果(即将上一步的结果进行转换)
    • public CompletableFuture (Function<? super T,? extends U> fn)
    • public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn)
    • public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      public static void thenApplyAsyncDemo(){
      Integer integer = CompletableFuture
      .completedFuture("task 1")
      .thenApplyAsync(x -> {
      String intString = x.split(" ")[1];
      return Integer.valueOf(intString);
      })
      .join();
      System.out.println("thenApplyAsyncDemo: " + integer);
      }
      --------- 输出 -----------
      thenApplyAsyncDemo: 1

    thenAccept

    • 下一个Stage接收了当前Stage的结果但是在计算中无需返回值(可以简单认为这里就是消费终点,因为没有返回值。当然下一步不依赖当前返回值的情况除外)
    • public CompletableFuture thenAccept(Consumer<? super T> action)
    • public CompletableFuture thenAcceptAsync(Consumer<? super T> action)
    • public CompletableFuture thenAcceptAsync(Consumer<? super T> action,Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      public static void thenAcceptDemo(){
      CompletableFuture
      .completedFuture("task")
      .thenAcceptAsync(s -> {
      System.out.println("thenAcceptDemo:" + s);
      })
      .join();
      }
      --------- 输出 -----------
      thenAcceptDemo:task

    thenRun

    • 不再关心上一步运算的结果,直接进行下一步的运算
    • public CompletableFuture thenRun(Runnable action)
    • public CompletableFuture thenRunAsync(Runnable action)
    • public CompletableFuture thenRunAsync(Runnable action, Executor executor)
      1
      2
      3
      4
      5
      6
      7
      public static void thenRunDemo() {
      CompletableFuture.completedFuture("Task")
      .thenRun(() -> System.out.println("我不知道上面的参数,也不会继续往下传递值"))
      .join();
      }
      --------- 输出 -----------
      我不知道上面的参数,也不会继续往下传递值

    thenCombine

    • 结合前面两个Stage的结果,进行转化
    • public <U,V> CompletableFuture thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
    • public <U,V> CompletableFuture thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
    • public <U,V> CompletableFuture thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      public static void thenCombineDemo() {
      CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "task 1");
      CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "task 2");
      String result = task1.thenCombineAsync(task2, (t1, t2) -> t1 + " - "+ t2)
      .join();
      System.out.println(result);
      }
      --------- 输出 -----------
      task 1 - task 2

    thenAcceptBoth

    • 结合两个CompletionStage的结果,进行消耗,和thenCombine相比,只是少了返回值
    • public CompletableFuture thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
    • public CompletableFuture thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
    • public CompletableFuture thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      public static void thenAcceptBothDemo() {
      CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "task 1");
      CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "task 2");
      task1.thenAcceptBoth(task2, (t1, t2) -> System.out.println("thenAcceptBothDemo: " +t1 + " - " + t2))
      .join();
      }
      --------- 输出 -----------
      thenAcceptBothDemo: task 1 - task 2

    runAfterBoth

    • 在两个CompletionStage都运行完执行。
    • public CompletableFuture runAfterBoth(CompletionStage<?> other,Runnable action)
    • public CompletableFuture runAfterBothAsync(CompletionStage<?> other,Runnable action)
    • public CompletableFuture runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      public static void runAfterBothDemo() {
      CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {
      sleep(100);
      println("task1 Done");
      });
      CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> {
      sleep(200);
      println("task2 Done");
      });
      task1.runAfterBoth(task2, () -> println("Task 1 And Task 2 Both Done"))
      .join();
      }
      --------- 输出 -----------
      2019-06-23 19:22:08.498: task1 Done
      2019-06-23 19:22:08.595: task2 Done
      2019-06-23 19:22:08.596: Task 1 And Task 2 Both Done

    applyToEither

    • 在两个CompletionStage中选择计算快的,将其结果进行下一步的转化操作。
    • public CompletableFuture applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
    • public CompletableFuture applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
    • public CompletableFuture applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      public static void applyToEitherDemo() {
      CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
      sleep(10);
      return "task 1";
      });
      CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
      sleep(20);
      return "task 2";
      });
      Integer result = task1.applyToEither(task2, t -> Integer.valueOf(t.split(" ")[1]))
      .join();
      System.out.println("applyToEitherDemo:" + result);
      }
      --------- 输出 -----------
      applyToEitherDemo:1

    acceptEither

    • 在两个CompletionStage中选择计算快的,作为下一步计算的结果。
    • public CompletableFuture acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
    • public CompletableFuture acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
    • public CompletableFuture acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      public static void acceptEitherDemo() {
      CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
      sleep(10);
      return "task 1";
      });
      CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
      sleep(20);
      return "task 2";
      });
      task1.acceptEitherAsync(task2, t -> System.out.println("acceptEitherDemo:" +Integer.valueOf(t.split(" ")[1])))
      .join();
      }
      --------- 输出 -----------
      acceptEitherDemo:1

    runAfterEither

    • 两个CompletionStage,任何一个完成了都会执行下一步的操作。
    • public CompletableFuture runAfterEither(CompletionStage<?> other,Runnable action)
    • public CompletableFuture runAfterEitherAsync(CompletionStage<?> other,Runnable action)
    • public CompletableFuture runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      public static void runAfterEitherDemo() {
      CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {
      sleep(100);
      println("task1 Done");
      });
      CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> {
      sleep(200);
      println("task2 Done");
      });
      task1.runAfterEither(task2, () -> println("Task 1 Or Task 2 Done"))
      .join();
      sleep(100);
      }
      --------- 输出 -----------
      2019-06-23 19:24:27.644: task1 Done
      2019-06-23 19:24:27.645: Task 1 Or Task 2 Done
      2019-06-23 19:24:27.749: task2 Done

    thenCompose

    • 连接两个CompletableFuture,返回值是新的CompletableFuture
    • public CompletableFuture thenCompose(Function<? super T, ? extends CompletionStage> fn)
    • public CompletableFuture thenComposeAsync(Function<? super T, ? extends CompletionStage> fn)
    • public CompletableFuture thenComposeAsync(Function<? super T, ? extends CompletionStage> fn,Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      public static void thenComposeDemo() {
      String result = CompletableFuture.completedFuture("Start")
      .thenCompose(x -> CompletableFuture.supplyAsync(() -> {
      sleep(20);
      return x + " task 2";
      }))
      .thenCompose(x -> CompletableFuture.supplyAsync(() -> {
      sleep(10);
      return x + " task 1";
      }))
      .join();
      System.out.println("thenComposeDemo:" +result);
      }
      --------- 输出 -----------
      thenComposeDemo:Start task 2 task 1

    whenComplete

    • 当运行完成时,对结果的记录。
      • 正常执行,返回值。
      • 异常抛出造成程序的中断
    • 注意,内部线程出现异常会抛到外层,导致外层线程产生异常。
    • public CompletableFuture whenComplete(BiConsumer<? super T, ? super Throwable> action)
    • public CompletableFuture whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
    • public CompletableFuture whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      public static void whenCompleteDemo() {
      CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
      sleep(10);
      return "task 1";
      });
      CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
      sleep(20);
      throw new RuntimeException("task2 RuntimeException");
      });
      String task1res = task1.whenComplete((res, exp) -> {
      println("task1 res:" + res);
      println("task1 exp:" + exp);
      }).join();
      println(task1res);
      String task2res = task2.whenComplete((res, exp) -> {
      println("task2 res:" + res);
      println("task2 exp:" + exp.getMessage());
      }).join();
      println(task2res);
      }
      --------- 输出 -----------
      2019-06-23 19:50:03.429 ForkJoinPool.commonPool-worker-1: task1 res:task 1
      2019-06-23 19:50:03.430 ForkJoinPool.commonPool-worker-1: task1 exp:null
      2019-06-23 19:50:03.430 main: task 1
      2019-06-23 19:50:03.439 ForkJoinPool.commonPool-worker-2: task2 res:null
      2019-06-23 19:50:03.439 ForkJoinPool.commonPool-worker-2: task2 exp:java.lang.RuntimeException: task2 RuntimeException
      Exception in thread "main" java.util.concurrent.CompletionException: java.lang.RuntimeException: task2 RuntimeException
      at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
      at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
      at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
      at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
      at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
      at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
      at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
      at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
      Caused by: java.lang.RuntimeException: task2 RuntimeException
      at com.example.demo.ThreadTest.lambda$whenCompleteDemo$5(ThreadTest.java:44)
      at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
      ... 5 more

    handle

    • 运行完成时,对结果的处理。这里的完成时有两种情况,
      • 正常执行,返回值
      • 遇到异常抛出造成程序的中断。
    • 异常不会被抛到外层,不会造成外部线程因为异常中断
    • public CompletableFuture handle(BiFunction<? super T, Throwable, ? extends U> fn)
    • public CompletableFuture handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
    • public CompletableFuture handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      public static void handleDemo() {
      CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
      sleep(10);
      return "task 1";
      });
      CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
      sleep(20);
      throw new RuntimeException("task2 RuntimeException");
      });
      String task1res = task1.handle((res, exp) -> {
      println("task1 res:" + res);
      println("task1 exp:" + exp);
      return res;
      }).join();
      println(task1res);
      String task2res = task2.handle((res, exp) -> {
      println("task2 res:" + res);
      println("task2 exp:" + exp.getMessage());
      return res;
      }).join();
      println(task2res);
      }
      --------- 输出 -----------
      2019-06-23 19:50:53.542 ForkJoinPool.commonPool-worker-1: task1 res:task 1
      2019-06-23 19:50:53.543 ForkJoinPool.commonPool-worker-1: task1 exp:null
      2019-06-23 19:50:53.543 main: task 1
      2019-06-23 19:50:53.552 ForkJoinPool.commonPool-worker-2: task2 res:null
      2019-06-23 19:50:53.552 ForkJoinPool.commonPool-worker-2: task2 exp:java.lang.RuntimeException: task2 RuntimeException
      2019-06-23 19:50:53.552 main: null

    exceptionally

    • 异常处理逻辑,可以设置异常情况下的返回值
    • public CompletionStage exceptionally(Function<Throwable, ? extends T> fn)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      public static void exceptionallyDemo() {
      Object result = CompletableFuture.supplyAsync(() -> {
      sleep(20);
      throw new RuntimeException("task2 RuntimeException");
      }).exceptionally(e->{
      System.out.println("exceptionally: " +e);
      return e.getMessage();
      }).join();
      System.out.println("exceptionallyDemo: " +result);
      }
      --------- 输出 -----------
      exceptionally: java.util.concurrent.CompletionException: java.lang.RuntimeException: task2 RuntimeException
      exceptionallyDemo: java.lang.RuntimeException: task2 RuntimeException

    其他方法

    • public boolean isDone()

      • 是否已经完成(包括正常完成,异常完成,取消完成)
    • public T get() throws InterruptedException, ExecutionException

      • 阻塞方式获取结果
    • public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException

      • 带超时方式的阻塞方式获取结果
    • public T join()

      • 阻塞至任务完成。会抛出CompletionException(unchecked类型)
    • public T getNow(T valueIfAbsent)

      • 立即获取结果,如果任务没有执行完成,则返回valueIfAbsent
    • public boolean complete(T value)

      • 如果任务还没有执行完成,则用当前值去替换完成值,否则继续使用原始值。
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        private static void completeDemo() {
        println("complete start");
        CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
        println("runAsync start");
        sleep(2000);
        println("runAsync end");
        return "task run finish";
        });
        boolean complete = task.complete("finish");
        System.out.println("complete:"+complete);
        System.out.println("task:"+task.join());
        println("complete end");
        }
        --------- 输出 -----------
        2019-06-23 20:55:19.491 main: complete start
        complete:true
        task:finish
        2019-06-23 20:55:19.547 main: complete end
        ------------------------------------------------------------------------
        private static void completeDemo() {
        println("complete start");
        CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
        println("runAsync start");
        sleep(1);
        println("runAsync end");
        return "task run finish";
        });
        sleep(10);
        boolean complete = task.complete("finish");
        System.out.println("complete:"+complete);
        System.out.println("task:"+task.join());
        println("complete end");
        }
        --------- 输出 -----------
        2019-06-23 20:59:32.375 main: complete start
        2019-06-23 20:59:32.426 ForkJoinPool.commonPool-worker-1: runAsync start
        2019-06-23 20:59:32.428 ForkJoinPool.commonPool-worker-1: runAsync end
        complete:false
        task:task run finish
        2019-06-23 20:59:32.437 main: complete end
    • public boolean completeExceptionally(Throwable ex)

      • 如果任务还没有执行完成,则以异常的方式中断执行(调用join方法会抛出该异常),如果执行完成,则返回false,并正常执行
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
            private static void completeExceptionallyDemo() {
        println("completeExceptionally start");
        CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
        println("supplyAsync start");
        sleep(1);
        println("supplyAsync end");
        return "task run finish";
        });
        sleep(10);
        boolean complete = task.completeExceptionally(new NullPointerException("Test Null"));
        System.out.println("complete:"+complete);
        System.out.println("task:"+task.join());
        println("complete 1 end");

        task = CompletableFuture.supplyAsync(() -> {
        println("supplyAsync start");
        sleep(1);
        println("supplyAsync end");
        return "task run finish";
        });
        complete = task.completeExceptionally(new NullPointerException("Test Null"));
        System.out.println("complete:"+complete);
        System.out.println("task:"+task.join());
        }
        --------- 输出 -----------
        2019-06-23 21:11:48.276 main: completeExceptionally start
        2019-06-23 21:11:48.330 ForkJoinPool.commonPool-worker-1: supplyAsync start
        2019-06-23 21:11:48.331 ForkJoinPool.commonPool-worker-1: supplyAsync end
        complete:false
        task:task run finish
        2019-06-23 21:11:48.340 main: complete 1 end
        complete:true
        Exception in thread "main" java.util.concurrent.CompletionException: java.lang.NullPointerException: Test Null
        at java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375)
        at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1934)
        at com.example.demo.ThreadTest.completeExceptionallyDemo(ThreadTest.java:35)
        at com.example.demo.ThreadTest.main(ThreadTest.java:9)
        Caused by: java.lang.NullPointerException: Test Null
        at com.example.demo.ThreadTest.completeExceptionallyDemo(ThreadTest.java:33)
        ... 1 more
    • public CompletableFuture toCompletableFuture()

      • 返回CompletableFuture对象,实际代码中返回this
    • public boolean cancel(boolean mayInterruptIfRunning)

      • 取消任务,mayInterruptIfRunning在当前实现没有任何作用。。(醉了)
      • 任务取消后如果执行join方法会抛出CancellationException异常。
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        private static void cancelDemo() {
        CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
        @Override
        public void run() {
        println("Start");
        sleep(1000);
        println("End");

        }
        });
        sleep(10);
        boolean cancel = future.cancel(true);
        println(cancel);
        System.out.println("----------------------------------");
        future = CompletableFuture.runAsync(new Runnable() {
        @Override
        public void run() {
        println("Start");
        sleep(10);
        println("End");

        }
        });
        sleep(100);
        cancel = future.cancel(true);
        println(cancel);
        }
        --------- 输出 -----------
        2019-06-23 21:56:33.060 ForkJoinPool.commonPool-worker-1: Start
        2019-06-23 21:56:33.072 main: true
        ----------------------------------
        2019-06-23 21:56:33.075 ForkJoinPool.commonPool-worker-2: Start
        2019-06-23 21:56:33.086 ForkJoinPool.commonPool-worker-2: End
        2019-06-23 21:56:33.176 main: false
    • public boolean isCancelled()

      • 返回当前任务是否已经被取消
    • public boolean isCompletedExceptionally()

      • 返回当前任务是否异常方式中断
    • public void obtrudeValue(T value)

      • 将future的结果强制更改为value,无论是否发生异常
    • public void obtrudeException(Throwable ex)

      • 将future的结果强制更改为异常,只要调用get或者join均会抛出该异常,同时会修改isCompletedExceptionally的结果
    • public int getNumberOfDependents()

      • 返回有多少个后续stage依赖当前stage
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        private static void getNumberOfDependentsDemo() {
        CompletableFuture<String> t1 = CompletableFuture.supplyAsync(() -> {
        sleep(1000);
        return "1";
        });
        CompletableFuture<String> t2 = CompletableFuture.supplyAsync(() -> {
        sleep(1000);
        return "2";
        });
        sleep(10);
        println(t1.getNumberOfDependents());
        println(t2.getNumberOfDependents());
        CompletableFuture<Void> all = CompletableFuture.allOf(t1, t2);
        println(all.isDone());
        println(t1.getNumberOfDependents());
        println(t2.getNumberOfDependents());
        all.join();
        println(t1.getNumberOfDependents());
        println(t2.getNumberOfDependents());
        }
        --------- 输出 -----------
        2019-06-23 22:26:45.132 main: 0
        2019-06-23 22:26:45.133 main: 0
        2019-06-23 22:26:45.133 main: false
        2019-06-23 22:26:45.133 main: 1
        2019-06-23 22:26:45.133 main: 1
        2019-06-23 22:26:46.122 main: 0
        2019-06-23 22:26:46.122 main: 0

      参考