概述

AbstractExecutorService是jdk线程池实现的基类,它实现了ExecutorService接口的部分方法。

解析

AbstractExecutorService

AbstractExecutorService实现了批量处理的几个方法,例如doInvokeAny、invokeAll、cancelAll等。

Executor实现类

继承关系

1
public abstract class AbstractExecutorService implements ExecutorService {}

主要方法

构建运行单元newTaskFor

RunnableFuture是线程池的运行单元,Runnable和Callable都会转化为RunnableFuture,具体可看FutureTask解析。

1
2
3
4
5
6
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
提交任务submit

submit中奖Runnable任务封装成RunnableFuture,任何交由execute()方法去运行;execute()方法在AbstractExecutorService类中是抽象方法,由实现类实现。

1
2
3
4
5
6
7
8
9
10
11
12
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

执行任意invokeAny

invokeAny:启动多个线程,相互独立的(无同步)去计算一个结果,当某一个线程得到结果之后,立刻终止所有线程,因为只需要一个结果就够了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}

invokeAny最终都通过doInvokeAny方法实现具体逻辑。

  1. 使用ExecutorCompletionService来为维护完成的任务,逻辑是:当任务完成调用done方法时将其加入BlockingQueue;
  2. 通过BlockingQueue来控制是限定时间取还是堵塞取,当取到后将所有任务中断,然后返回返回值;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<>(ntasks);
// ExecutorCompletionService 用于记录完成的任务
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);

// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.

try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();

// Start one task for sure; the rest incrementally
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;

for (;;) {
Future<T> f = ecs.poll();
if (f == null) {
if (ntasks > 0) {
--ntasks;
// 执行,实际执行的还是this
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
// 从完成队列(BlockingQueue)中取
f = ecs.poll(nanos, NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take(); // 堵塞取
}
if (f != null) {
--active;
try {
return f.get(); // 返回
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}

if (ee == null)
ee = new ExecutionException();
throw ee;

} finally {
// 取消其他的(直接中断)
cancelAll(futures);
}
}
全部执行invokeAll

使用execute()方法执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
// 使用execute执行
execute(f);
}
// 获取返回值
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try { f.get(); }
catch (CancellationException | ExecutionException ignore) {}
}
}
return futures;
} catch (Throwable t) {
cancelAll(futures);
throw t;
}
}
// 获取返回值加入超时时间
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
final long nanos = unit.toNanos(timeout);
final long deadline = System.nanoTime() + nanos;
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
int j = 0;
timedOut: try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));

final int size = futures.size();

// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
for (int i = 0; i < size; i++) {
if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L)
break timedOut;
execute((Runnable)futures.get(i));
}

for (; j < size; j++) {
Future<T> f = futures.get(j);
if (!f.isDone()) {
try { f.get(deadline - System.nanoTime(), NANOSECONDS); }
catch (CancellationException | ExecutionException ignore) {}
catch (TimeoutException timedOut) {
break timedOut;
}
}
}
return futures;
} catch (Throwable t) {
cancelAll(futures);
throw t;
}
// Timed out before all the tasks could be completed; cancel remaining
cancelAll(futures, j);
return futures;
}

全部取消cancelAll()

取消实际执行FutureTask的cancel方法,在入参为true时强制中断。

1
2
3
4
5
6
7
8
9
private static <T> void cancelAll(ArrayList<Future<T>> futures) {
cancelAll(futures, 0);
}

/** Cancels all futures with index at least j. */
private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) {
for (int size = futures.size(); j < size; j++)
futures.get(j).cancel(true);
}