Special {@link Future} which is writable.

Promise的类注释中描述了它的用途:特殊的Future是可修改的。

Promise在Netty中代表任务执行的结果。

解析

Promise接口

Promise接口提供了一些set方法用于设置返回值;Promise接口的设计是链式的,许多方法返回的都是Promise接口本身。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public interface Promise<V> extends Future<V> {

Promise<V> setSuccess(V result);

boolean trySuccess(V result);

Promise<V> setFailure(Throwable cause);

boolean tryFailure(Throwable cause);

boolean setUncancellable();

@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

@Override
Promise<V> await() throws InterruptedException;

@Override
Promise<V> awaitUninterruptibly();

@Override
Promise<V> sync() throws InterruptedException;

@Override
Promise<V> syncUninterruptibly();
}

实现类

Promise接口的实现类众多,这里先挑选三个典型的实现类解析,DefaultPromise、PromiseTask和ScheduledFutureTask。

Promise接口实现类

DefaultPromise类

DefaultPromise是Promise最基本的实现类。DefaultPromise是实现管理器的公用方法,仅仅是对任务执行的描述,但是并没有真正的操作任务。

继承关系与成员变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
private static final InternalLogger rejectedExecutionLogger =
InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
// 可以嵌套的Listener的最大层数,可见最大值为8
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
@SuppressWarnings("rawtypes")
// jdk9 unsafe类的替代实现,DefaultPromise类的result字段原子操作
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
// 异步操作正常完成
private static final Object SUCCESS = new Object();
// 异步操作不可取消,并且尚未完成
private static final Object UNCANCELLABLE = new Object();
// 异步操作取消监听,用于cancel操作,而CauseHolder 的实例对象是用来表示异步操作异常结束,同时保存异常信息
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
new CancellationException(), DefaultPromise.class, "cancel(...)"));
private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();
// 执行后的返回值
private volatile Object result;
// 执行器
private final EventExecutor executor;
// 监听器,可以是GenericFutureListener,DefaultFutureListeners
private Object listeners;
// wait()等待的线程数
private short waiters;
// 是否需要通知
private boolean notifyingListeners;
}

几个重点属性

  • SUCCESS:异步操作正常完成
  • UNCANCELLABLE:异步操作不可取消,并且尚未完成
  • CANCELLATION_CAUSE_HOLDER:异步操作取消监听,用于cancel操作,而CauseHolder 的实例对象是用来表示异步操作异常结束,同时保存异常信息
  • result:执行后的返回值,取值有:SUCCESS 、UNCANCELLABLE、CauseHolder以及V
  • listeners:监听器,可以是GenericFutureListener,DefaultFutureListeners
  • waiters:wait()等待的线程数
  • notifyingListeners:是否需要通知
核心方法
是否完成isDone
1
2
3
4
5
6
7
@Override
public boolean isDone() {
return isDone0(result);
}
private static boolean isDone0(Object result) {
return result != null && result != UNCANCELLABLE;
}
setSuccess方法

几个类似的设置方法

setSuccess/trySuccess/tryFailure/setUncancellable这些方法实现上都一样,都是通过cas设置result属性的值。这里列举setSuccess作为展示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
// cas更新
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
// 修改成功唤醒等待的线程,并通知监听器
if (checkNotifyWaiters()) {
notifyListeners();
}
return true;
}
return false;
}

唤醒等待的线程

1
2
3
4
5
6
private synchronized boolean checkNotifyWaiters() {
if (waiters > 0) {
notifyAll();
}
return listeners != null;
}
通知监听器notifyListeners

notifyListeners中调用notifyListenersNow通知所有的监听器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
// 通知所有的监听器
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}

safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}

notifyListenersNow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void notifyListenersNow() {
Object listeners;
synchronized (this) {
// Only proceed if there are listeners to notify and we are not already notifying listeners.
// notifyingListeners为true代表正在处理中
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
// 区分不同类型的Listener
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
// Nothing can throw from within this method, so setting notifyingListeners back to false does not
// need to be in a finally block.
notifyingListeners = false;
return;
}
listeners = this.listeners;
this.listeners = null;
}
}
}

notifyListeners0

如果是DefaultFutureListeners则遍历其中所有的listener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}

最终执行GenericFutureListener的operationComplete方法。
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}

添加监听器addListener
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");

synchronized (this) {
addListener0(listener);
}

if (isDone()) {
notifyListeners();
}

return this;
}
获取执行结果get
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public V get() throws InterruptedException, ExecutionException {
Object result = this.result;
if (!isDone0(result)) {
// 还没完成则等待
await();
result = this.result;
}
if (result == SUCCESS || result == UNCANCELLABLE) {
return null;
}
Throwable cause = cause0(result);
if (cause == null) {
return (V) result;
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}

PromiseTask类

PromiseTask继承DefaultPromise类并实现了RunnableFuture接口,集Runnable和Future于一体。相对于DefaultPromise而言,PromiseTask对任务做了操作的处理,真正的在执行任务后进行返回值设置。

继承关系以及成员变量

在task完成后会将task属性设置成对应的Runnable:COMPLETED,CANCELLED,FAILED

1
2
3
4
5
6
7
class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
private static final Runnable COMPLETED = new SentinelRunnable("COMPLETED");
private static final Runnable CANCELLED = new SentinelRunnable("CANCELLED");
private static final Runnable FAILED = new SentinelRunnable("FAILED");
// 任务,runnable或者callable
private Object task;
}

SentinelRunnable是一个空任务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static class SentinelRunnable implements Runnable {
private final String name;

SentinelRunnable(String name) {
this.name = name;
}

@Override
public void run() { } // no-op

@Override
public String toString() {
return name;
}
}

DefaultPromise可修改返回值,在PromiseTask不允许对象本身之外修改返回值,所以执行对应的方法会直接抛异常。例如设置成功:

1
2
3
4
5
6
7
8
9
10
@Override
public final Promise<V> setSuccess(V result) {
throw new IllegalStateException();
}

protected final Promise<V> setSuccessInternal(V result) {
super.setSuccess(result);
clearTaskAfterCompletion(true, COMPLETED);
return this;
}

runTask方法
1
2
3
4
5
6
7
8
9
final V runTask() throws Exception {
final Object task = this.task;
// 执行任务
if (task instanceof Callable) {
return ((Callable<V>) task).call();
}
((Runnable) task).run();
return null;
}
run方法
1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void run() {
try {
// 设置不可中断
if (setUncancellableInternal()) {
V result = runTask();
// 设置结果
setSuccessInternal(result);
}
} catch (Throwable e) {
setFailureInternal(e);
}
}

ScheduledFutureTask类

ScheduledFutureTask类实现延迟任务管理,不仅支持延迟执行也可以根据周期一直运行。

ScheduledFutureTask类同时实现了优先级队列节点接口(PriorityQueueNode)。

继承关系与成员变量
1
2
3
4
5
6
7
8
9
10
11
12
13
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
private static final long START_TIME = System.nanoTime();
// 任务id,用于比较大小
private long id;
// 结束时间
private long deadlineNanos;
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
// 周期任务的执行周期
private final long periodNanos;
// 队列序号
private int queueIndex = INDEX_NOT_IN_QUEUE;

}
获取延迟时间getDelay

在解析ScheduledThreadPoolExecutor线程的那篇文章中解析了jdk的ScheduledFutureTask,其中它的getDelay方法就是用于获取延迟/定时任务的延迟时间,这是实现延时/定时的核心方法。

1
2
3
public long getDelay(TimeUnit unit) {
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
}

1
2
3
4
5
6
7
8
9
public long delayNanos() {
return deadlineToDelayNanos(deadlineNanos());
}
static long deadlineToDelayNanos(long deadlineNanos) {
return Math.max(0, deadlineNanos - nanoTime());
}
static long nanoTime() {
return System.nanoTime() - START_TIME;
}
任务运行run

在run中执行具体的任务并处理周期任务的运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public void run() {
// 在事件循环中
assert executor().inEventLoop();
try {
// 不用周期运行
if (periodNanos == 0) {
if (setUncancellableInternal()) {
V result = runTask();
setSuccessInternal(result);
}
} else {
// check if is done as it may was cancelled
if (!isCancelled()) {
// 执行
runTask();
if (!executor().isShutdown()) {
// 处理周期运行
if (periodNanos > 0) {
deadlineNanos += periodNanos;
} else {
deadlineNanos = nanoTime() - periodNanos;
}
if (!isCancelled()) {
// scheduledTaskQueue can never be null as we lazy init it before submit the task!
Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
assert scheduledTaskQueue != null;
scheduledTaskQueue.add(this);
}
}
}
}
} catch (Throwable cause) {
setFailureInternal(cause);
}
}

大小比较compareTo

compareTo用于优先级队列。

延迟时间小的小,然后id小的小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}

ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else {
assert id != that.id;
return 1;
}
}

队列序号设置priorityQueueIndex
1
2
3
4
5
6
7
8
9
@Override
public int priorityQueueIndex(DefaultPriorityQueue<?> queue) {
return queueIndex;
}

@Override
public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) {
queueIndex = i;
}

}
`