Netty解析八:可写的Future类【Promise】
Special {@link Future} which is writable.
Promise的类注释中描述了它的用途:特殊的Future是可修改的。
解析
Promise接口
Promise接口提供了一些set方法用于设置返回值;Promise接口的设计是链式的,许多方法返回的都是Promise接口本身。
1 | public interface Promise<V> extends Future<V> { |
实现类
Promise接口的实现类众多,这里先挑选三个典型的实现类解析,DefaultPromise、PromiseTask和ScheduledFutureTask。

DefaultPromise类
DefaultPromise是Promise最基本的实现类。DefaultPromise是实现管理器的公用方法,仅仅是对任务执行的描述,但是并没有真正的操作任务。
继承关系与成员变量
1 | public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { |
几个重点属性
- SUCCESS:异步操作正常完成
- UNCANCELLABLE:异步操作不可取消,并且尚未完成
- CANCELLATION_CAUSE_HOLDER:异步操作取消监听,用于cancel操作,而CauseHolder 的实例对象是用来表示异步操作异常结束,同时保存异常信息
- result:执行后的返回值,取值有:SUCCESS 、UNCANCELLABLE、CauseHolder以及V
- listeners:监听器,可以是GenericFutureListener,DefaultFutureListeners
- waiters:wait()等待的线程数
- notifyingListeners:是否需要通知
核心方法
是否完成isDone
1 |
|
setSuccess方法
几个类似的设置方法
setSuccess/trySuccess/tryFailure/setUncancellable这些方法实现上都一样,都是通过cas设置result属性的值。这里列举setSuccess作为展示。
1 |
|
唤醒等待的线程
1 | private synchronized boolean checkNotifyWaiters() { |
通知监听器notifyListeners
notifyListeners中调用notifyListenersNow通知所有的监听器。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
// 通知所有的监听器
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
safeExecute(executor, new Runnable() {
public void run() {
notifyListenersNow();
}
});
}
notifyListenersNow1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31private void notifyListenersNow() {
Object listeners;
synchronized (this) {
// Only proceed if there are listeners to notify and we are not already notifying listeners.
// notifyingListeners为true代表正在处理中
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
// 区分不同类型的Listener
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
// Nothing can throw from within this method, so setting notifyingListeners back to false does not
// need to be in a finally block.
notifyingListeners = false;
return;
}
listeners = this.listeners;
this.listeners = null;
}
}
}
notifyListeners0
如果是DefaultFutureListeners则遍历其中所有的listener1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}
最终执行GenericFutureListener的operationComplete方法。
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
添加监听器addListener
1 |
|
获取执行结果get
1 |
|
PromiseTask类
PromiseTask继承DefaultPromise类并实现了RunnableFuture接口,集Runnable和Future于一体。相对于DefaultPromise而言,PromiseTask对任务做了操作的处理,真正的在执行任务后进行返回值设置。
继承关系以及成员变量
在task完成后会将task属性设置成对应的Runnable:COMPLETED,CANCELLED,FAILED1
2
3
4
5
6
7class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
private static final Runnable COMPLETED = new SentinelRunnable("COMPLETED");
private static final Runnable CANCELLED = new SentinelRunnable("CANCELLED");
private static final Runnable FAILED = new SentinelRunnable("FAILED");
// 任务,runnable或者callable
private Object task;
}
SentinelRunnable是一个空任务类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private static class SentinelRunnable implements Runnable {
private final String name;
SentinelRunnable(String name) {
this.name = name;
}
public void run() { } // no-op
public String toString() {
return name;
}
}
DefaultPromise可修改返回值,在PromiseTask不允许对象本身之外修改返回值,所以执行对应的方法会直接抛异常。例如设置成功:1
2
3
4
5
6
7
8
9
10
public final Promise<V> setSuccess(V result) {
throw new IllegalStateException();
}
protected final Promise<V> setSuccessInternal(V result) {
super.setSuccess(result);
clearTaskAfterCompletion(true, COMPLETED);
return this;
}
runTask方法
1 | final V runTask() throws Exception { |
run方法
1 |
|
ScheduledFutureTask类
ScheduledFutureTask类实现延迟任务管理,不仅支持延迟执行也可以根据周期一直运行。
ScheduledFutureTask类同时实现了优先级队列节点接口(PriorityQueueNode)。
继承关系与成员变量
1 | final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode { |
获取延迟时间getDelay
在解析ScheduledThreadPoolExecutor线程的那篇文章中解析了jdk的ScheduledFutureTask,其中它的getDelay方法就是用于获取延迟/定时任务的延迟时间,这是实现延时/定时的核心方法。1
2
3public long getDelay(TimeUnit unit) {
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
}
1 | public long delayNanos() { |
任务运行run
在run中执行具体的任务并处理周期任务的运行。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void run() {
// 在事件循环中
assert executor().inEventLoop();
try {
// 不用周期运行
if (periodNanos == 0) {
if (setUncancellableInternal()) {
V result = runTask();
setSuccessInternal(result);
}
} else {
// check if is done as it may was cancelled
if (!isCancelled()) {
// 执行
runTask();
if (!executor().isShutdown()) {
// 处理周期运行
if (periodNanos > 0) {
deadlineNanos += periodNanos;
} else {
deadlineNanos = nanoTime() - periodNanos;
}
if (!isCancelled()) {
// scheduledTaskQueue can never be null as we lazy init it before submit the task!
Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
assert scheduledTaskQueue != null;
scheduledTaskQueue.add(this);
}
}
}
}
} catch (Throwable cause) {
setFailureInternal(cause);
}
}
大小比较compareTo
compareTo用于优先级队列。
延迟时间小的小,然后id小的小。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else {
assert id != that.id;
return 1;
}
}
队列序号设置priorityQueueIndex
1 |
|
}`