概述
ScheduledThreadPoolExecutor可以用来在给定延时后执行异步任务或者周期性执行任务。
解析
ScheduledThreadPoolExecutor内部实现了一个延迟队列DelayedWorkQueue,先对延迟队列的实现做解析。
在延迟队列解析前先对ScheduledThreadPoolExecutor的执行任务类(ScheduledFutureTask)作解析。
定时任务执行单元ScheduledFutureTask
延迟队列DelayedWorkQueue使用堆排序实现,延迟时间最小的任务排在对顶;
ScheduledThreadPoolExecutor中取任务后判断任务延时时间,并且是否当前线程持有任务,不是当前线程持有则等待await(),是当前线程则等待对应的延时available.awaitNanos(delay)。
继承关系
1 2
| private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {}
|
成员变量
1 2 3 4 5 6 7 8 9 10
| private final long sequenceNumber;
private volatile long time;
private final long period;
RunnableScheduledFuture<V> outerTask = this;
int heapIndex;
|
构造器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| ScheduledFutureTask(Runnable r, V result, long triggerTime, long sequenceNumber) { super(r, result); this.time = triggerTime; this.period = 0; this.sequenceNumber = sequenceNumber; } ScheduledFutureTask(Runnable r, V result, long triggerTime, long period, long sequenceNumber) { super(r, result); this.time = triggerTime; this.period = period; this.sequenceNumber = sequenceNumber; } ScheduledFutureTask(Callable<V> callable, long triggerTime, long sequenceNumber) { super(callable); this.time = triggerTime; this.period = 0; this.sequenceNumber = sequenceNumber; }
|
比较compareTo(Delayed other)
ScheduledFutureTaskd比较的话先比较下次运行时间,如果相等在比较sequenceNumber
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public int compareTo(Delayed other) { if (other == this) return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
|
是否周期运行isPeriodic()
1 2 3
| public boolean isPeriodic() { return period != 0; }
|
设置下次运行时间
1 2 3 4 5 6 7
| private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }
|
执行任务run()
是否能运行依赖于ScheduledThreadPoolExecutor的canRunInCurrentRunState方法
run方法中不关心是否到执行的时间了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public void run() { if (!canRunInCurrentRunState(this)) cancel(false); else if (!isPeriodic()) super.run(); else if (super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
|
是否能运行canRunInCurrentRunState(RunnableScheduledFuture<?> task)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) { if (!isShutdown()) return true; if (isStopped()) return false; return task.isPeriodic() ? continueExistingPeriodicTasksAfterShutdown : (executeExistingDelayedTasksAfterShutdown || task.getDelay(NANOSECONDS) <= 0); }
|
reExecutePeriodic()
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(task)) { super.getQueue().add(task); if (canRunInCurrentRunState(task) || !remove(task)) { ensurePrestart(); return; } } task.cancel(false); }
|
延迟队列DelayedWorkQueue
继承关系
1 2
| static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {}
|
成员变量
1 2 3 4 5 6 7 8
| private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; private final ReentrantLock lock = new ReentrantLock();
private int size;
|
DelayedWorkQueue队列的底层结构是使用数组来存储,保存数据时采用堆排序的方式。
底层数据操作方法
设置堆排序数组位置
1 2 3 4
| private static void setIndex(RunnableScheduledFuture<?> f, int idx) { if (f instanceof ScheduledFutureTask) ((ScheduledFutureTask)f).heapIndex = idx; }
|
左叶添加节点
1 2 3 4 5 6 7 8 9 10 11 12 13
| private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
|
上部插入节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private void siftDown(int k, RunnableScheduledFuture<?> key) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; RunnableScheduledFuture<?> c = queue[child]; int right = child + 1; if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; if (key.compareTo(c) <= 0) break; queue[k] = c; setIndex(c, k); k = child; } queue[k] = key; setIndex(key, k); }
|
扩容grow()
1.5倍扩容
1 2 3 4 5 6 7
| private void grow() { int oldCapacity = queue.length; int newCapacity = oldCapacity + (oldCapacity >> 1); if (newCapacity < 0) newCapacity = Integer.MAX_VALUE; queue = Arrays.copyOf(queue, newCapacity); }
|
peek()
1 2 3 4 5 6 7 8 9
| public RunnableScheduledFuture<?> peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return queue[0]; } finally { lock.unlock(); } }
|
offer(Runnable x)
向左叶子节点插入元素x,并唤醒消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; } public void put(Runnable e) { offer(e); }
public boolean add(Runnable e) { return offer(e); }
public boolean offer(Runnable e, long timeout, TimeUnit unit) { return offer(e); }
|
取元素take()
- 获得锁
- 循环:
取出头结点元素,如果该元素有延时,则判断leader指向的线程是不是当前线程,如果不是则等待await(),是当前线程则等待对应的延时available.awaitNanos(delay)。
- leader为空且头结点不为空,则available.signal();
- 释放锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0L) return finishPoll(first); first = null; if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
|
取出元素后更新堆排序finishPoll
1 2 3 4 5 6 7 8 9
| private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { int s = --size; RunnableScheduledFuture<?> x = queue[s]; queue[s] = null; if (s != 0) siftDown(0, x); setIndex(f, -1); return f; }
|
定时任务线程池ScheduledThreadPoolExecutor
继承关系
1 2 3
| public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {}
|
成员变量
1 2 3 4 5 6 7 8 9 10
| private volatile boolean continueExistingPeriodicTasksAfterShutdown;
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
volatile boolean removeOnCancel;
private static final AtomicLong sequencer = new AtomicLong();
private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
|
构造器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory, handler); }
|
核心方法
普通任务执行入口方法
1 2 3 4 5 6 7 8 9 10 11 12
| public void execute(Runnable command) { schedule(command, 0, NANOSECONDS); } public Future<?> submit(Runnable task) { return schedule(task, 0, NANOSECONDS); } public <T> Future<T> submit(Runnable task, T result) { return schedule(Executors.callable(task, result), 0, NANOSECONDS); } public <T> Future<T> submit(Callable<T> task) { return schedule(task, 0, NANOSECONDS); }
|
延时/定时任务执行入口方法
decorateTask() 是个空方法,保留给后续实现类扩展。
延时/定时任务的核心就在于delayedExecute(t)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<Void> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit), sequencer.getAndIncrement())); delayedExecute(t); return t; } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit), sequencer.getAndIncrement())); delayedExecute(t); return t; } public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0L) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period), sequencer.getAndIncrement()); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
|
任务提交核心delayedExecute(RunnableScheduledFuture<?> task)
将任务加入到队列中,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (!canRunInCurrentRunState(task) && remove(task)) task.cancel(false); else ensurePrestart(); } } void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
|