0%

Java线程池分析-AbstractExecutorService

内部只有若干方法

内部依赖方法

newTaskFor(Runnable runnable, T value)

  • 构造一个FutureTask,FutureTask 实现了 RunnableFuture,既是Runnable接口,也是Future接口,类似于适配器。
    1
    2
    3
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
    }

newTaskFor(Callable callable)

  • 同上,将一个Callable适配到RunnableFuture
    1
    2
    3
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
    }

submit 提交任务方法

submit(Runnable task)

  • 将Runnable接口封装为 RunnableFuture,并由子类实现执行逻辑
    1
    2
    3
    4
    5
    6
    public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
    }

submit(Runnable task, T result)

  • 将Runnable接口封装为 RunnableFuture,并由子类实现执行逻辑
    1
    2
    3
    4
    5
    6
    public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
    }

submit(Callable task)

  • 将Callable接口封装为 RunnableFuture,并由子类实现执行逻辑
    1
    2
    3
    4
    5
    6
    public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
    }

Invoke系列方法

doInvokeAny

  • 执行tasks任务,可以指定是否带有超时参数。invokeAny方法底层依赖该方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {
    if (tasks == null)
    throw new NullPointerException();
    int ntasks = tasks.size();
    if (ntasks == 0)
    throw new IllegalArgumentException();
    //全部task对应的future集合
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    //实际执行的实体
    ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
    try {
    ExecutionException ee = null;
    //是否需要超时
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Iterator<? extends Callable<T>> it = tasks.iterator();
    //先提交一个任务
    futures.add(ecs.submit(it.next()));
    //任务数减一
    --ntasks;
    //工作中的线程数为1
    int active = 1;
    for (;;) {
    Future<T> f = ecs.poll();//获取一个执行的任务
    //判断任务是否完成,为null则还没有执行完成
    if (f == null) {
    //提交的任务是否已经全部由ecs执行,如果还有未提交的,则继续提交。
    if (ntasks > 0) {
    --ntasks;
    futures.add(ecs.submit(it.next()));
    ++active;
    }
    else if (active == 0)//没有存活的任务,说明任务已经完成,但是有异常,导致active=0,则中断循环,然后抛出异常
    break;
    else if (timed) {//检查是否需要超时
    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
    if (f == null)
    throw new TimeoutException();
    nanos = deadline - System.nanoTime();
    }
    else
    f = ecs.take();//不许要超时,则阻塞获取
    }
    if (f != null) {//有任务完成,active数量减一,并返回结果
    --active;
    try {
    return f.get();
    } catch (ExecutionException eex) {
    ee = eex;
    } catch (RuntimeException rex) {
    ee = new ExecutionException(rex);
    }
    }
    }
    if (ee == null)
    ee = new ExecutionException();
    throw ee;
    } finally {
    for (int i = 0, size = futures.size(); i < size; i++)
    futures.get(i).cancel(true);//已经完成或者抛出异常,取消其他正在执行的任务。
    }
    }

invokeAny

  • 忽略超时异常的执行方式

    1
    2
    3
    4
    5
    6
    7
    8
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
    try {
    return doInvokeAny(tasks, false, 0);
    } catch (TimeoutException cannotHappen) {//忽略超时异常
    assert false;
    return null;
    }
    }
  • 可以设置超时的执行方式

    1
    2
    3
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    return doInvokeAny(tasks, true, unit.toNanos(timeout));//会抛出超时异常
    }

invokeAll

  • 全部执行并等待全部完成
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
    if (tasks == null)
    throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
    //全部任务提交并执行
    for (Callable<T> t : tasks) {
    RunnableFuture<T> f = newTaskFor(t);
    futures.add(f);
    execute(f);
    }
    //等待全部结果完成
    for (int i = 0, size = futures.size(); i < size; i++) {
    Future<T> f = futures.get(i);
    if (!f.isDone()) {
    try {
    f.get();
    } catch (CancellationException ignore) {
    } catch (ExecutionException ignore) {
    }
    }
    }
    //全部完成后标示位更新
    done = true;
    return futures;
    } finally {
    if (!done)//如果没有完成,说明有异常,则取消所有任务
    for (int i = 0, size = futures.size(); i < size; i++)
    futures.get(i).cancel(true);
    }
    }
  • 全部执行,并带有超时的等待完成
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
    throws InterruptedException {
    if (tasks == null)
    throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
    for (Callable<T> t : tasks)
    futures.add(newTaskFor(t));
    final long deadline = System.nanoTime() + nanos;
    final int size = futures.size();
    //提交任务
    for (int i = 0; i < size; i++) {
    execute((Runnable)futures.get(i));
    //计算超时时间
    nanos = deadline - System.nanoTime();
    if (nanos <= 0L)
    return futures;
    }
    //获取结果
    for (int i = 0; i < size; i++) {
    Future<T> f = futures.get(i);
    if (!f.isDone()) {
    if (nanos <= 0L)//已经超时,则直接返回现有的
    return futures;
    try {
    f.get(nanos, TimeUnit.NANOSECONDS);//带有超时的去获取,如果超时,则直接返回结果
    } catch (CancellationException ignore) {
    } catch (ExecutionException ignore) {
    } catch (TimeoutException toe) {
    return futures;
    }
    nanos = deadline - System.nanoTime();
    }
    }
    done = true;//正常完成
    return futures;
    } finally {
    if (!done)//非正常完成,则取消剩余任务
    for (int i = 0, size = futures.size(); i < size; i++)
    futures.get(i).cancel(true);
    }
    }