Java线程池一:基本概念与FutureTask
概述
Java的线程池一直在用,最近的抽奖也用到线程池来实现异步,但是对于线程的配置还有具体的实现还是很懵懂的,所以打算对线程池的源码进行阅读。
解析
jdk提供的线程池的基本入口是 java.util.concurrent.Executor
Executor可以理解为线程的执行器,优雅的解耦了任务处理机制中的任务提交和任务如何运行(也包含线程的使用,调度)
Executor
1 | public interface Executor { |
Executor框架包括3大部分:
- 任务。也就是工作单元,包括被执行任务需要实现的接口:Runnable接口或者Callable接口;
- 任务的执行。也就是把任务分派给多个线程的执行机制,包括Executor接口及继承自Executor接口的ExecutorService接口;
- 异步计算的结果。包括Future接口及实现了Future接口的FutureTask类。
runnable实现类,RunnableFuture是jdk线程池运行单元,它的实现类是FutureTask;通过RunnableFuture对运算单元的封装使得线程池与Runnable、Callable接口解耦。
ExecutorService
ExecutorService在Executor的基础上定义了更多的方法,例如任务批量提交、是否停止、是否中断等。
Executor实现类:
RunnableFuture
RunnableFuture接口统一定义了运行单元。
1 | public interface RunnableFuture<V> extends Runnable, Future<V> { |
FutureTask
继承关系
1 | public class FutureTask<V> implements RunnableFuture<V> {} |
成员变量
1 | // 运行状态 |
构造器
如果是Runnable对象会使用Executors.callable工具进行转换(适配器)。
初始状态是 NEW1
2
3
4
5
6
7
8
9
10
11public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
主要方法
执行且记录返回值run()
1 | public void run() { |
设置异常setException(Throwable t)
状态改为完成COMPLETING1
2
3
4
5
6
7protected void setException(Throwable t) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = t;
STATE.setRelease(this, EXCEPTIONAL); // final state
finishCompletion();
}
}
设置返回值set()
状态改为完成COMPLETING1
2
3
4
5
6
7protected void set(V v) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = v;
STATE.setRelease(this, NORMAL); // final state
finishCompletion();
}
}
完成后续操作finishCompletion()
释放WaitNode(等待运行结束获取结果的线程)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// waiters 置空
if (WAITERS.weakCompareAndSet(this, q, null)) {
// 释放WaitNode链表
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 唤起线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 空方法
done();
callable = null; // to reduce footprint
}
执行并且不记录返回值 runAndReset()
1 | protected boolean runAndReset() { |
可能中断处理handlePossibleCancellationInterrupt(int s)
中断状态让出cpu时间片1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
取消cancel(boolean mayInterruptIfRunning)
NEW状态才能取消或者中断
1 | public boolean cancel(boolean mayInterruptIfRunning) { |
获取返回值get()
1 | public V get() throws InterruptedException, ExecutionException { |
等待执行完成awaitDone(boolean timed, long nanos)
等待执行完成或者等待一定的时间,返回当前状态。
1 | // time是否有等待时间限制 |