Java线程池二:AbstractExecutorService
概述
AbstractExecutorService是jdk线程池实现的基类,它实现了ExecutorService接口的部分方法。
解析
AbstractExecutorService
AbstractExecutorService实现了批量处理的几个方法,例如doInvokeAny、invokeAll、cancelAll等。

继承关系
1 | public abstract class AbstractExecutorService implements ExecutorService {} |
主要方法
构建运行单元newTaskFor
RunnableFuture是线程池的运行单元,Runnable和Callable都会转化为RunnableFuture,具体可看FutureTask解析。
1 | protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { |
提交任务submit
submit中奖Runnable任务封装成RunnableFuture,任何交由execute()方法去运行;execute()方法在AbstractExecutorService类中是抽象方法,由实现类实现。1
2
3
4
5
6
7
8
9
10
11
12public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
执行任意invokeAny
invokeAny:启动多个线程,相互独立的(无同步)去计算一个结果,当某一个线程得到结果之后,立刻终止所有线程,因为只需要一个结果就够了。
1 | public <T> T invokeAny(Collection<? extends Callable<T>> tasks) |
invokeAny最终都通过doInvokeAny方法实现具体逻辑。
- 使用ExecutorCompletionService来为维护完成的任务,逻辑是:当任务完成调用done方法时将其加入BlockingQueue;
- 通过BlockingQueue来控制是限定时间取还是堵塞取,当取到后将所有任务中断,然后返回返回值;
1 | private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, |
全部执行invokeAll
使用execute()方法执行1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
// 使用execute执行
execute(f);
}
// 获取返回值
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try { f.get(); }
catch (CancellationException | ExecutionException ignore) {}
}
}
return futures;
} catch (Throwable t) {
cancelAll(futures);
throw t;
}
}
// 获取返回值加入超时时间
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
final long nanos = unit.toNanos(timeout);
final long deadline = System.nanoTime() + nanos;
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
int j = 0;
timedOut: try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
final int size = futures.size();
// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
for (int i = 0; i < size; i++) {
if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L)
break timedOut;
execute((Runnable)futures.get(i));
}
for (; j < size; j++) {
Future<T> f = futures.get(j);
if (!f.isDone()) {
try { f.get(deadline - System.nanoTime(), NANOSECONDS); }
catch (CancellationException | ExecutionException ignore) {}
catch (TimeoutException timedOut) {
break timedOut;
}
}
}
return futures;
} catch (Throwable t) {
cancelAll(futures);
throw t;
}
// Timed out before all the tasks could be completed; cancel remaining
cancelAll(futures, j);
return futures;
}
全部取消cancelAll()
取消实际执行FutureTask的cancel方法,在入参为true时强制中断。1
2
3
4
5
6
7
8
9private static <T> void cancelAll(ArrayList<Future<T>> futures) {
cancelAll(futures, 0);
}
/** Cancels all futures with index at least j. */
private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) {
for (int size = futures.size(); j < size; j++)
futures.get(j).cancel(true);
}