publicThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue){ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } publicThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory){ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } publicThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler){ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
privatevoidprocessWorkerExit(Worker w, boolean completedAbruptly){ // 异常结束则减worker数 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); // 移除worker final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate();
int c = ctl.get(); // 如果还没停止且worker数太少,则新添加一个worker if (runStateLessThan(c, STOP)) { // 自然结束的判断一下是不是小于核心线程数或者根本没有线程了 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
privatebooleanaddWorker(Runnable firstTask, boolean core){ retry: for (int c = ctl.get();;) { // Check if queue empty only if necessary. // 线程池在运行,等待队列为空则返回false if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) returnfalse;
for (;;) { // workerCountOf(c)当前工作线程数 // core为true表示当前任务是核心线程运行 // 则线程数不能超过核心线程数,core为false则线程数不能超过最大线程上限 if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) returnfalse; // cas线程数加一 if (compareAndIncrementWorkerCount(c)) break retry; // 加成功后跳出retry循环 c = ctl.get(); // Re-read ctl // 没有中断 if (runStateAtLeast(c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask);// 构建新工作类 final Thread t = w.thread;// 构造器中调用线程工厂类生成 if (t != null) { // workers操作加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get();
if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.isAlive()) // precheck that t is startable thrownew IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 添加成功则开始运行 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
添加Worker失败addWorkerFailed(Worker w)
将添加失败的worker移出workers列表,然后更新线程数尝试中断(tryTerminate)
1 2 3 4 5 6 7 8 9 10 11 12
privatevoidaddWorkerFailed(Worker w){ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
publicbooleanawaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { while (runStateLessThan(ctl.get(), TERMINATED)) { if (nanos <= 0L) returnfalse; nanos = termination.awaitNanos(nanos); } returntrue; } finally { mainLock.unlock(); } }
设置核心线程数setCorePoolSize
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
publicvoidsetCorePoolSize(int corePoolSize){ if (corePoolSize < 0 || maximumPoolSize < corePoolSize) thrownew IllegalArgumentException(); int delta = corePoolSize - this.corePoolSize; this.corePoolSize = corePoolSize; if (workerCountOf(ctl.get()) > corePoolSize) interruptIdleWorkers(); elseif (delta > 0) { // We don't really know how many new threads are "needed". // As a heuristic, prestart enough new workers (up to new // core size) to handle the current number of tasks in // queue, but stop if queue becomes empty while doing so. int k = Math.min(delta, workQueue.size()); while (k-- > 0 && addWorker(null, true)) { if (workQueue.isEmpty()) break; } } }