概述

ScheduledThreadPoolExecutor可以用来在给定延时后执行异步任务或者周期性执行任务。

解析

ScheduledThreadPoolExecutor内部实现了一个延迟队列DelayedWorkQueue,先对延迟队列的实现做解析。

在延迟队列解析前先对ScheduledThreadPoolExecutor的执行任务类(ScheduledFutureTask)作解析。

定时任务执行单元ScheduledFutureTask

延迟队列DelayedWorkQueue使用堆排序实现,延迟时间最小的任务排在对顶;

ScheduledThreadPoolExecutor中取任务后判断任务延时时间,并且是否当前线程持有任务,不是当前线程持有则等待await(),是当前线程则等待对应的延时available.awaitNanos(delay)。

继承关系

1
2
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {}

成员变量

1
2
3
4
5
6
7
8
9
10
// 序列荷坳
private final long sequenceNumber;
// 下次运行时间
private volatile long time;
// 周期性运行的运行周期
private final long period;

RunnableScheduledFuture<V> outerTask = this;
// 堆排序中的数组序号
int heapIndex;

构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ScheduledFutureTask(Runnable r, V result, long triggerTime,
long sequenceNumber) {
super(r, result);
this.time = triggerTime;
this.period = 0;
this.sequenceNumber = sequenceNumber;
}
ScheduledFutureTask(Runnable r, V result, long triggerTime,
long period, long sequenceNumber) {
super(r, result);
this.time = triggerTime;
this.period = period;
this.sequenceNumber = sequenceNumber;
}
ScheduledFutureTask(Callable<V> callable, long triggerTime,
long sequenceNumber) {
super(callable);
this.time = triggerTime;
this.period = 0;
this.sequenceNumber = sequenceNumber;
}

比较compareTo(Delayed other)

ScheduledFutureTaskd比较的话先比较下次运行时间,如果相等在比较sequenceNumber

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

是否周期运行isPeriodic()

1
2
3
public boolean isPeriodic() {
return period != 0;
}

设置下次运行时间

1
2
3
4
5
6
7
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}

执行任务run()

是否能运行依赖于ScheduledThreadPoolExecutor的canRunInCurrentRunState方法

run方法中不关心是否到执行的时间了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void run() {
// 当前能不能跑,不能跑就取消
if (!canRunInCurrentRunState(this))
cancel(false);
// 不是周期运行
else if (!isPeriodic())
super.run();
// 周期运行,runAndReset执行后不会变更任务状态,所以可以重复执行
else if (super.runAndReset()) {
// 更新下次运行的时间
setNextRunTime();
// 再次周期性执行
reExecutePeriodic(outerTask);
}
}

是否能运行canRunInCurrentRunState(RunnableScheduledFuture<?> task)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
//没有大于SHUTDOWN状态
if (!isShutdown())
return true;
// 停止了
if (isStopped())
return false;
// continueExistingPeriodicTasksAfterShutdown 周期性任务是否能在shutdown后继续运行
// 只有SHUTDOWN状态才能走到这里
// 如果该任务是周期性运行的则返回continueExistingPeriodicTasksAfterShutdown
// 不然判断continueExistingPeriodicTasksAfterShutdown或者延迟时间小于0
return task.isPeriodic()
? continueExistingPeriodicTasksAfterShutdown
: (executeExistingDelayedTasksAfterShutdown
|| task.getDelay(NANOSECONDS) <= 0);
}

reExecutePeriodic()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(task)) {
// 再次加入队列
super.getQueue().add(task);
// 能执行但是无法移除
if (canRunInCurrentRunState(task) || !remove(task)) {
// 添加worker
ensurePrestart();
return;
}
}
// 取消
task.cancel(false);
}

延迟队列DelayedWorkQueue

继承关系

1
2
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {}

成员变量

1
2
3
4
5
6
7
8
// 初始容量
private static final int INITIAL_CAPACITY = 16;
// 底层存储数据结构
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
// 当前队列长度
private int size;

DelayedWorkQueue队列的底层结构是使用数组来存储,保存数据时采用堆排序的方式。

底层数据操作方法

设置堆排序数组位置

1
2
3
4
private static void setIndex(RunnableScheduledFuture<?> f, int idx) {
if (f instanceof ScheduledFutureTask)
((ScheduledFutureTask)f).heapIndex = idx;
}

左叶添加节点

1
2
3
4
5
6
7
8
9
10
11
12
13
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}

上部插入节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}

扩容grow()
1.5倍扩容

1
2
3
4
5
6
7
private void grow() {
int oldCapacity = queue.length;
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
queue = Arrays.copyOf(queue, newCapacity);
}

peek()

1
2
3
4
5
6
7
8
9
public RunnableScheduledFuture<?> peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return queue[0];
} finally {
lock.unlock();
}
}

offer(Runnable x)

向左叶子节点插入元素x,并唤醒消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e); // 叶子节点插入
}
if (queue[0] == e) {
leader = null;
available.signal(); // 唤醒消费者
}
} finally {
lock.unlock();
}
return true;
}
public void put(Runnable e) {
offer(e);
}

public boolean add(Runnable e) {
return offer(e);
}

public boolean offer(Runnable e, long timeout, TimeUnit unit) {
return offer(e);
}

取元素take()

  1. 获得锁
  2. 循环:
    取出头结点元素,如果该元素有延时,则判断leader指向的线程是不是当前线程,如果不是则等待await(),是当前线程则等待对应的延时available.awaitNanos(delay)。
  3. leader为空且头结点不为空,则available.signal();
  4. 释放锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 取出头节点
RunnableScheduledFuture<?> first = queue[0];
if (first == null) // 没有则等待
available.await();
else {
// 处理延迟
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return finishPoll(first);
first = null; // don't retain ref while waiting
// 其他线程在等待这个元素延迟完成
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}

取出元素后更新堆排序finishPoll

1
2
3
4
5
6
7
8
9
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}

定时任务线程池ScheduledThreadPoolExecutor

继承关系

1
2
3
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {}

成员变量

1
2
3
4
5
6
7
8
9
10
// 线程池停止时周期任务是否继续运行
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
// 线程池停止时延迟任务是否继续运行
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
// 任务取消时是否移出任务队列
volatile boolean removeOnCancel;
// 任务序号用于执行时的排序(优先)
private static final AtomicLong sequencer = new AtomicLong();
// 默认线程存活时间
private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;

构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}

核心方法

普通任务执行入口方法
1
2
3
4
5
6
7
8
9
10
11
12
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public Future<?> submit(Runnable task) {
return schedule(task, 0, NANOSECONDS);
}
public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
延时/定时任务执行入口方法

decorateTask() 是个空方法,保留给后续实现类扩展。

延时/定时任务的核心就在于delayedExecute(t)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<Void> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit),
sequencer.getAndIncrement()));
delayedExecute(t);
return t;
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit),
sequencer.getAndIncrement()));
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0L)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period),
sequencer.getAndIncrement());
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
任务提交核心delayedExecute(RunnableScheduledFuture<?> task)

将任务加入到队列中,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown()) // 如果已经结束则拒绝任务
reject(task);
else {
super.getQueue().add(task); // 将任务加入队列
// 再判断一下当前线程池运行状态,如果当前状态不能运行任务了就移出并取消任务
if (!canRunInCurrentRunState(task) && remove(task))
task.cancel(false);
else
ensurePrestart(); // 增加线程
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}