static方法
public static CompletableFuture supplyAsync(Supplier supplier)
- 提交一个Supplier任务,异步执行,可以获取任务返回结果,使用ForkJoinPool.commonPool()执行任务。
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
- 提交一个Supplier任务,异步执行,可以获取任务返回结果,使用指定的线程池执行
public static CompletableFuture runAsync(Runnable runnable)
- 提交一个Runnable任务,异步执行,无返回结果,使用ForkJoinPool.commonPool()执行任务。
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
- 提交一个Supplier任务,异步执行,无返回结果,使用指定的线程池执行
public static CompletableFuture completedFuture(U value)
- 新建一个完成的CompletableFuture,通常作为计算的起点阶段。
public static CompletableFuture allOf(CompletableFuture<?>… cfs)
- 接收一个由CompletableFuture 构成的数组,需要等待多个 CompletableFuture 对象执行完毕,执行join操作可以等待CompletableFuture执行完成。
public static CompletableFuture
- 接收一个由CompletableFuture 构成的数组,返回由第一个执行完毕的 CompletableFuture 对象的返回值构成的CompletableFuture
示例代码
1 | import java.text.SimpleDateFormat; |
运行结果
1 | 2019-06-23 18:02:03.552: completedFuture: Test |
实例方法
- 实例方法整体比较规则,一个标准执行方法,一个异步执行方法,一个指定异步线程执行方法
thenApply
- then是指在当前阶段正常执行完成后(正常执行是指没有抛出异常)进行的操作。Apply是指将一个Function作用于之前阶段得出的结果(即将上一步的结果进行转换)
- public CompletableFuture (Function<? super T,? extends U> fn)
- public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn)
- public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
1
2
3
4
5
6
7
8
9
10
11
12public static void thenApplyAsyncDemo(){
Integer integer = CompletableFuture
.completedFuture("task 1")
.thenApplyAsync(x -> {
String intString = x.split(" ")[1];
return Integer.valueOf(intString);
})
.join();
System.out.println("thenApplyAsyncDemo: " + integer);
}
--------- 输出 -----------
thenApplyAsyncDemo: 1
thenAccept
- 下一个Stage接收了当前Stage的结果但是在计算中无需返回值(可以简单认为这里就是消费终点,因为没有返回值。当然下一步不依赖当前返回值的情况除外)
- public CompletableFuture
thenAccept(Consumer<? super T> action) - public CompletableFuture
thenAcceptAsync(Consumer<? super T> action) - public CompletableFuture
thenAcceptAsync(Consumer<? super T> action,Executor executor) 1
2
3
4
5
6
7
8
9
10public static void thenAcceptDemo(){
CompletableFuture
.completedFuture("task")
.thenAcceptAsync(s -> {
System.out.println("thenAcceptDemo:" + s);
})
.join();
}
--------- 输出 -----------
thenAcceptDemo:task
thenRun
- 不再关心上一步运算的结果,直接进行下一步的运算
- public CompletableFuture
thenRun(Runnable action) - public CompletableFuture
thenRunAsync(Runnable action) - public CompletableFuture
thenRunAsync(Runnable action, Executor executor) 1
2
3
4
5
6
7public static void thenRunDemo() {
CompletableFuture.completedFuture("Task")
.thenRun(() -> System.out.println("我不知道上面的参数,也不会继续往下传递值"))
.join();
}
--------- 输出 -----------
我不知道上面的参数,也不会继续往下传递值
thenCombine
- 结合前面两个Stage的结果,进行转化
- public <U,V> CompletableFuture
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) - public <U,V> CompletableFuture
thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) - public <U,V> CompletableFuture
thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor) 1
2
3
4
5
6
7
8
9public static void thenCombineDemo() {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "task 1");
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "task 2");
String result = task1.thenCombineAsync(task2, (t1, t2) -> t1 + " - "+ t2)
.join();
System.out.println(result);
}
--------- 输出 -----------
task 1 - task 2
thenAcceptBoth
- 结合两个CompletionStage的结果,进行消耗,和thenCombine相比,只是少了返回值
- public CompletableFuture
thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) - public CompletableFuture
thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) - public CompletableFuture
thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor) 1
2
3
4
5
6
7
8public static void thenAcceptBothDemo() {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "task 1");
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "task 2");
task1.thenAcceptBoth(task2, (t1, t2) -> System.out.println("thenAcceptBothDemo: " +t1 + " - " + t2))
.join();
}
--------- 输出 -----------
thenAcceptBothDemo: task 1 - task 2
runAfterBoth
- 在两个CompletionStage都运行完执行。
- public CompletableFuture
runAfterBoth(CompletionStage<?> other,Runnable action) - public CompletableFuture
runAfterBothAsync(CompletionStage<?> other,Runnable action) - public CompletableFuture
runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor) 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public static void runAfterBothDemo() {
CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {
sleep(100);
println("task1 Done");
});
CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> {
sleep(200);
println("task2 Done");
});
task1.runAfterBoth(task2, () -> println("Task 1 And Task 2 Both Done"))
.join();
}
--------- 输出 -----------
2019-06-23 19:22:08.498: task1 Done
2019-06-23 19:22:08.595: task2 Done
2019-06-23 19:22:08.596: Task 1 And Task 2 Both Done
applyToEither
- 在两个CompletionStage中选择计算快的,将其结果进行下一步的转化操作。
- public CompletableFuture applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
- public CompletableFuture applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
- public CompletableFuture applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public static void applyToEitherDemo() {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
sleep(10);
return "task 1";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
sleep(20);
return "task 2";
});
Integer result = task1.applyToEither(task2, t -> Integer.valueOf(t.split(" ")[1]))
.join();
System.out.println("applyToEitherDemo:" + result);
}
--------- 输出 -----------
applyToEitherDemo:1
acceptEither
- 在两个CompletionStage中选择计算快的,作为下一步计算的结果。
- public CompletableFuture
acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) - public CompletableFuture
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) - public CompletableFuture
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor) 1
2
3
4
5
6
7
8
9
10
11
12
13
14public static void acceptEitherDemo() {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
sleep(10);
return "task 1";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
sleep(20);
return "task 2";
});
task1.acceptEitherAsync(task2, t -> System.out.println("acceptEitherDemo:" +Integer.valueOf(t.split(" ")[1])))
.join();
}
--------- 输出 -----------
acceptEitherDemo:1
runAfterEither
- 两个CompletionStage,任何一个完成了都会执行下一步的操作。
- public CompletableFuture
runAfterEither(CompletionStage<?> other,Runnable action) - public CompletableFuture
runAfterEitherAsync(CompletionStage<?> other,Runnable action) - public CompletableFuture
runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor) 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public static void runAfterEitherDemo() {
CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {
sleep(100);
println("task1 Done");
});
CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> {
sleep(200);
println("task2 Done");
});
task1.runAfterEither(task2, () -> println("Task 1 Or Task 2 Done"))
.join();
sleep(100);
}
--------- 输出 -----------
2019-06-23 19:24:27.644: task1 Done
2019-06-23 19:24:27.645: Task 1 Or Task 2 Done
2019-06-23 19:24:27.749: task2 Done
thenCompose
- 连接两个CompletableFuture,返回值是新的CompletableFuture
- public CompletableFuture thenCompose(Function<? super T, ? extends CompletionStage> fn)
- public CompletableFuture thenComposeAsync(Function<? super T, ? extends CompletionStage> fn)
- public CompletableFuture thenComposeAsync(Function<? super T, ? extends CompletionStage> fn,Executor executor)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public static void thenComposeDemo() {
String result = CompletableFuture.completedFuture("Start")
.thenCompose(x -> CompletableFuture.supplyAsync(() -> {
sleep(20);
return x + " task 2";
}))
.thenCompose(x -> CompletableFuture.supplyAsync(() -> {
sleep(10);
return x + " task 1";
}))
.join();
System.out.println("thenComposeDemo:" +result);
}
--------- 输出 -----------
thenComposeDemo:Start task 2 task 1
whenComplete
- 当运行完成时,对结果的记录。
- 正常执行,返回值。
- 异常抛出造成程序的中断
- 注意,内部线程出现异常会抛到外层,导致外层线程产生异常。
- public CompletableFuture
whenComplete(BiConsumer<? super T, ? super Throwable> action) - public CompletableFuture
whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) - public CompletableFuture
whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40public static void whenCompleteDemo() {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
sleep(10);
return "task 1";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
sleep(20);
throw new RuntimeException("task2 RuntimeException");
});
String task1res = task1.whenComplete((res, exp) -> {
println("task1 res:" + res);
println("task1 exp:" + exp);
}).join();
println(task1res);
String task2res = task2.whenComplete((res, exp) -> {
println("task2 res:" + res);
println("task2 exp:" + exp.getMessage());
}).join();
println(task2res);
}
--------- 输出 -----------
2019-06-23 19:50:03.429 ForkJoinPool.commonPool-worker-1: task1 res:task 1
2019-06-23 19:50:03.430 ForkJoinPool.commonPool-worker-1: task1 exp:null
2019-06-23 19:50:03.430 main: task 1
2019-06-23 19:50:03.439 ForkJoinPool.commonPool-worker-2: task2 res:null
2019-06-23 19:50:03.439 ForkJoinPool.commonPool-worker-2: task2 exp:java.lang.RuntimeException: task2 RuntimeException
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.RuntimeException: task2 RuntimeException
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: task2 RuntimeException
at com.example.demo.ThreadTest.lambda$whenCompleteDemo$5(ThreadTest.java:44)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 5 more
handle
- 运行完成时,对结果的处理。这里的完成时有两种情况,
- 正常执行,返回值
- 遇到异常抛出造成程序的中断。
- 异常不会被抛到外层,不会造成外部线程因为异常中断
- public CompletableFuture handle(BiFunction<? super T, Throwable, ? extends U> fn)
- public CompletableFuture handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
- public CompletableFuture handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30public static void handleDemo() {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
sleep(10);
return "task 1";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
sleep(20);
throw new RuntimeException("task2 RuntimeException");
});
String task1res = task1.handle((res, exp) -> {
println("task1 res:" + res);
println("task1 exp:" + exp);
return res;
}).join();
println(task1res);
String task2res = task2.handle((res, exp) -> {
println("task2 res:" + res);
println("task2 exp:" + exp.getMessage());
return res;
}).join();
println(task2res);
}
--------- 输出 -----------
2019-06-23 19:50:53.542 ForkJoinPool.commonPool-worker-1: task1 res:task 1
2019-06-23 19:50:53.543 ForkJoinPool.commonPool-worker-1: task1 exp:null
2019-06-23 19:50:53.543 main: task 1
2019-06-23 19:50:53.552 ForkJoinPool.commonPool-worker-2: task2 res:null
2019-06-23 19:50:53.552 ForkJoinPool.commonPool-worker-2: task2 exp:java.lang.RuntimeException: task2 RuntimeException
2019-06-23 19:50:53.552 main: null
exceptionally
- 异常处理逻辑,可以设置异常情况下的返回值
- public CompletionStage
exceptionally(Function<Throwable, ? extends T> fn) 1
2
3
4
5
6
7
8
9
10
11
12
13public static void exceptionallyDemo() {
Object result = CompletableFuture.supplyAsync(() -> {
sleep(20);
throw new RuntimeException("task2 RuntimeException");
}).exceptionally(e->{
System.out.println("exceptionally: " +e);
return e.getMessage();
}).join();
System.out.println("exceptionallyDemo: " +result);
}
--------- 输出 -----------
exceptionally: java.util.concurrent.CompletionException: java.lang.RuntimeException: task2 RuntimeException
exceptionallyDemo: java.lang.RuntimeException: task2 RuntimeException
其他方法
public boolean isDone()
- 是否已经完成(包括正常完成,异常完成,取消完成)
public T get() throws InterruptedException, ExecutionException
- 阻塞方式获取结果
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
- 带超时方式的阻塞方式获取结果
public T join()
- 阻塞至任务完成。会抛出CompletionException(unchecked类型)
public T getNow(T valueIfAbsent)
- 立即获取结果,如果任务没有执行完成,则返回valueIfAbsent
public boolean complete(T value)
- 如果任务还没有执行完成,则用当前值去替换完成值,否则继续使用原始值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40private static void completeDemo() {
println("complete start");
CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
println("runAsync start");
sleep(2000);
println("runAsync end");
return "task run finish";
});
boolean complete = task.complete("finish");
System.out.println("complete:"+complete);
System.out.println("task:"+task.join());
println("complete end");
}
--------- 输出 -----------
2019-06-23 20:55:19.491 main: complete start
complete:true
task:finish
2019-06-23 20:55:19.547 main: complete end
------------------------------------------------------------------------
private static void completeDemo() {
println("complete start");
CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
println("runAsync start");
sleep(1);
println("runAsync end");
return "task run finish";
});
sleep(10);
boolean complete = task.complete("finish");
System.out.println("complete:"+complete);
System.out.println("task:"+task.join());
println("complete end");
}
--------- 输出 -----------
2019-06-23 20:59:32.375 main: complete start
2019-06-23 20:59:32.426 ForkJoinPool.commonPool-worker-1: runAsync start
2019-06-23 20:59:32.428 ForkJoinPool.commonPool-worker-1: runAsync end
complete:false
task:task run finish
2019-06-23 20:59:32.437 main: complete end
- 如果任务还没有执行完成,则用当前值去替换完成值,否则继续使用原始值。
public boolean completeExceptionally(Throwable ex)
- 如果任务还没有执行完成,则以异常的方式中断执行(调用join方法会抛出该异常),如果执行完成,则返回false,并正常执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40private static void completeExceptionallyDemo() {
println("completeExceptionally start");
CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
println("supplyAsync start");
sleep(1);
println("supplyAsync end");
return "task run finish";
});
sleep(10);
boolean complete = task.completeExceptionally(new NullPointerException("Test Null"));
System.out.println("complete:"+complete);
System.out.println("task:"+task.join());
println("complete 1 end");
task = CompletableFuture.supplyAsync(() -> {
println("supplyAsync start");
sleep(1);
println("supplyAsync end");
return "task run finish";
});
complete = task.completeExceptionally(new NullPointerException("Test Null"));
System.out.println("complete:"+complete);
System.out.println("task:"+task.join());
}
--------- 输出 -----------
2019-06-23 21:11:48.276 main: completeExceptionally start
2019-06-23 21:11:48.330 ForkJoinPool.commonPool-worker-1: supplyAsync start
2019-06-23 21:11:48.331 ForkJoinPool.commonPool-worker-1: supplyAsync end
complete:false
task:task run finish
2019-06-23 21:11:48.340 main: complete 1 end
complete:true
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.NullPointerException: Test Null
at java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375)
at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1934)
at com.example.demo.ThreadTest.completeExceptionallyDemo(ThreadTest.java:35)
at com.example.demo.ThreadTest.main(ThreadTest.java:9)
Caused by: java.lang.NullPointerException: Test Null
at com.example.demo.ThreadTest.completeExceptionallyDemo(ThreadTest.java:33)
... 1 more
- 如果任务还没有执行完成,则以异常的方式中断执行(调用join方法会抛出该异常),如果执行完成,则返回false,并正常执行
public CompletableFuture
toCompletableFuture() - 返回CompletableFuture对象,实际代码中返回this
public boolean cancel(boolean mayInterruptIfRunning)
- 取消任务,mayInterruptIfRunning在当前实现没有任何作用。。(醉了)
- 任务取消后如果执行join方法会抛出CancellationException异常。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34private static void cancelDemo() {
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
println("Start");
sleep(1000);
println("End");
}
});
sleep(10);
boolean cancel = future.cancel(true);
println(cancel);
System.out.println("----------------------------------");
future = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
println("Start");
sleep(10);
println("End");
}
});
sleep(100);
cancel = future.cancel(true);
println(cancel);
}
--------- 输出 -----------
2019-06-23 21:56:33.060 ForkJoinPool.commonPool-worker-1: Start
2019-06-23 21:56:33.072 main: true
----------------------------------
2019-06-23 21:56:33.075 ForkJoinPool.commonPool-worker-2: Start
2019-06-23 21:56:33.086 ForkJoinPool.commonPool-worker-2: End
2019-06-23 21:56:33.176 main: false
public boolean isCancelled()
- 返回当前任务是否已经被取消
public boolean isCompletedExceptionally()
- 返回当前任务是否异常方式中断
public void obtrudeValue(T value)
- 将future的结果强制更改为value,无论是否发生异常
public void obtrudeException(Throwable ex)
- 将future的结果强制更改为异常,只要调用get或者join均会抛出该异常,同时会修改isCompletedExceptionally的结果
public int getNumberOfDependents()
- 返回有多少个后续stage依赖当前stage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28private static void getNumberOfDependentsDemo() {
CompletableFuture<String> t1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "1";
});
CompletableFuture<String> t2 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "2";
});
sleep(10);
println(t1.getNumberOfDependents());
println(t2.getNumberOfDependents());
CompletableFuture<Void> all = CompletableFuture.allOf(t1, t2);
println(all.isDone());
println(t1.getNumberOfDependents());
println(t2.getNumberOfDependents());
all.join();
println(t1.getNumberOfDependents());
println(t2.getNumberOfDependents());
}
--------- 输出 -----------
2019-06-23 22:26:45.132 main: 0
2019-06-23 22:26:45.133 main: 0
2019-06-23 22:26:45.133 main: false
2019-06-23 22:26:45.133 main: 1
2019-06-23 22:26:45.133 main: 1
2019-06-23 22:26:46.122 main: 0
2019-06-23 22:26:46.122 main: 0
参考
- 返回有多少个后续stage依赖当前stage