Java Review - 并发编程_ThreadPoolExecutor原理源码剖析
文章目錄
- 線程池主要解決兩個(gè)問題
- 類關(guān)系圖
- ctl 含義 ---- 記錄線程池狀態(tài)和線程池中線程個(gè)數(shù)
- 線程池狀態(tài) 及轉(zhuǎn)換
- 線程池參數(shù)
- 線程池類型
- mainLock & termination
- Worker
- 源碼分析
- public void execute(Runnable command)
- 新增線程addWorkder源碼分析
- 工作線程Worker的執(zhí)行
- getTask()
- processWorkerExit
- shutdown
- shutdownNow
- awaitTermination
- 小結(jié)
線程池主要解決兩個(gè)問題
-
一是當(dāng)執(zhí)行大量異步任務(wù)時(shí)線程池能夠提供較好的性能。在不使用線程池時(shí),每當(dāng)需要執(zhí)行異步任務(wù)時(shí)直接new一個(gè)線程來運(yùn)行,而線程的創(chuàng)建和銷毀是需要開銷的。線程池里面的線程是可復(fù)用的,不需要每次執(zhí)行異步任務(wù)時(shí)都重新創(chuàng)建和銷毀線程。
-
二是線程池提供了一種資源限制和管理的手段,比如可以限制線程的個(gè)數(shù),動(dòng)態(tài)新增線程等。每個(gè)ThreadPoolExecutor也保留了一些基本的統(tǒng)計(jì)數(shù)據(jù),比如當(dāng)前線程池完成的任務(wù)數(shù)目等。
另外,線程池也提供了許多可調(diào)參數(shù)和可擴(kuò)展性接口,以滿足不同情境的需要,程序員可以使用更方便的Executors的工廠方法,比如newCachedThreadPool(線程池線程個(gè)數(shù)最多可達(dá)Integer.MAX_VALUE,線程自動(dòng)回收)、newFixedThreadPool(固定大小的線程池)和newSingleThreadExecutor(單個(gè)線程)等來創(chuàng)建線程池,當(dāng)然用戶還可以自定義。
類關(guān)系圖
在上圖中,Executors其實(shí)是個(gè)工具類,里面提供了好多靜態(tài)方法,這些方法根據(jù)用戶選擇返回不同的線程池實(shí)例。
ctl 含義 ---- 記錄線程池狀態(tài)和線程池中線程個(gè)數(shù)
ThreadPoolExecutor繼承了AbstractExecutorService,成員變量ctl是一個(gè)Integer的原子變量,用來記錄線程池狀態(tài)和線程池中線程個(gè)數(shù),類似于ReentrantReadWriteLock使用一個(gè)變量來保存兩種信息。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));這里假設(shè)Integer類型是32位二進(jìn)制表示,則其中高3位用來表示線程池狀態(tài),后面29位用來記錄線程池線程個(gè)數(shù)。
/用來標(biāo)記線程池狀態(tài)(高3位),線程個(gè)數(shù)(低29位) //默認(rèn)是RUNNING狀態(tài),線程個(gè)數(shù)為0private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//線程個(gè)數(shù)掩碼位數(shù) private static final int COUNT_BITS = Integer.SIZE - 3;//線程最大個(gè)數(shù)(低29位)00011111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1;線程池狀態(tài):
//(高3位):11100000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS;//(高3位):00000000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS;//(高3位):00100000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS;//(高3位):01000000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS;//(高3位):01100000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS;// 獲取高三位 運(yùn)行狀態(tài) private static int runStateOf(int c) { return c & ~CAPACITY; }//獲取低29位 線程個(gè)數(shù) private static int workerCountOf(int c) { return c & CAPACITY; }//計(jì)算ctl新值,線程狀態(tài) 與 線程個(gè)數(shù) private static int ctlOf(int rs, int wc) { return rs | wc; }線程池狀態(tài) 及轉(zhuǎn)換
-
RUNNING:接受新任務(wù)并且處理阻塞隊(duì)列里的任務(wù)。
-
SHUTDOWN:拒絕新任務(wù)但是處理阻塞隊(duì)列里的任務(wù)。
-
STOP:拒絕新任務(wù)并且拋棄阻塞隊(duì)列里的任務(wù),同時(shí)會(huì)中斷正在處理的任務(wù)。
-
TIDYING:所有任務(wù)都執(zhí)行完(包含阻塞隊(duì)列里面的任務(wù))后當(dāng)前線程池活動(dòng)線程數(shù)為0,將要調(diào)用terminated方法。
-
TERMINATED:終止?fàn)顟B(tài)。terminated方法調(diào)用完成以后的狀態(tài)。
線程池狀態(tài)轉(zhuǎn)換列舉如下。
-
RUNNING -> SHUTDOWN :顯式調(diào)用shutdown()方法,或者隱式調(diào)用了finalize()、方法里面的shutdown()方法。
-
RUNNING 或 SHUTDOWN)-> STOP :顯式調(diào)用 shutdownNow()方法時(shí)。
-
SHUTDOWN -> TIDYING :當(dāng)線程池和任務(wù)隊(duì)列都為空時(shí)。
-
STOP -> TIDYING :當(dāng)線程池為空時(shí)。
-
TIDYING -> TERMINATED: 當(dāng) terminated() hook 方法執(zhí)行完成時(shí)
線程池參數(shù)
-
corePoolSize:線程池核心線程個(gè)數(shù)。
-
workQueue:用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列, 比如基于數(shù)組的有界ArrayBlockingQueue、基于鏈表的無界LinkedBlockingQueue、最多只有一個(gè)元素的同步隊(duì)列SynchronousQueue及優(yōu)先級(jí)隊(duì)列PriorityBlockingQueue等。
-
maximunPoolSize:線程池最大線程數(shù)量。
-
ThreadFactory:創(chuàng)建線程的工廠。
-
RejectedExecutionHandler:飽和策略,當(dāng)隊(duì)列滿并且線程個(gè)數(shù)達(dá)到maximunPoolSize后采取的策略。
比如
AbortPolicy(拋出異常)、
CallerRunsPolicy(使用調(diào)用者所在線程來運(yùn)行任務(wù))、
DiscardOldestPolicy(調(diào)用poll丟棄一個(gè)任務(wù),執(zhí)行當(dāng)前任務(wù))
DiscardPolicy(默默丟棄,不拋出異常) -
keeyAliveTime:存活時(shí)間。如果當(dāng)前線程池中的線程數(shù)量比核心線程數(shù)量多,并且是閑置狀態(tài),則這些閑置的線程能存活的最大時(shí)間。
-
TimeUnit:存活時(shí)間的時(shí)間單位
線程池類型
- newFixedThreadPool :創(chuàng)建一個(gè)核心線程個(gè)數(shù)和最大線程個(gè)數(shù)都為nThreads的線程池,并且阻塞隊(duì)列長(zhǎng)度為Integer.MAX_VALUE。keeyAliveTime=0說明只要線程個(gè)數(shù)比核心線程個(gè)數(shù)多并且當(dāng)前空閑則回收
- newSingleThreadExecutor: 創(chuàng)建一個(gè)核心線程個(gè)數(shù)和最大線程個(gè)數(shù)都為1的線程池,并且阻塞隊(duì)列長(zhǎng)度為Integer.MAX_VALUE。keeyAliveTime=0說明只要線程個(gè)數(shù)比核心線程個(gè)數(shù)多并且當(dāng)前空閑則回收。
- newCachedThreadPool :創(chuàng)建一個(gè)按需創(chuàng)建線程的線程池,初始線程個(gè)數(shù)為0,最多線程個(gè)數(shù)為Integer.MAX_VALUE,并且阻塞隊(duì)列為同步隊(duì)列。keeyAliveTime=60說明只要當(dāng)前線程在60s內(nèi)空閑則回收。這個(gè)類型的特殊之處在于,加入同步隊(duì)列的任務(wù)會(huì)被馬上執(zhí)行,同步隊(duì)列里面最多只有一個(gè)任務(wù)。
mainLock & termination
/*** Lock held on access to workers set and related bookkeeping.* While we could use a concurrent set of some sort, it turns out* to be generally preferable to use a lock. Among the reasons is* that this serializes interruptIdleWorkers, which avoids* unnecessary interrupt storms, especially during shutdown.* Otherwise exiting threads would concurrently interrupt those* that have not yet interrupted. It also simplifies some of the* associated statistics bookkeeping of largestPoolSize etc. We* also hold mainLock on shutdown and shutdownNow, for the sake of* ensuring workers set is stable while separately checking* permission to interrupt and actually interrupting.*/private final ReentrantLock mainLock = new ReentrantLock(); /*** Wait condition to support awaitTermination*/private final Condition termination = mainLock.newCondition();-
mainLock是獨(dú)占鎖,用來控制新增Worker線程操作的原子性。
-
termination是該鎖對(duì)應(yīng)的條件隊(duì)列,在線程調(diào)用awaitTermination時(shí)用來存放阻塞的線程。
Worker
Worker繼承AQS和Runnable接口,是具體承載任務(wù)的對(duì)象。Worker繼承了AQS,自己實(shí)現(xiàn)了簡(jiǎn)單不可重入獨(dú)占鎖,其中state=0表示鎖未被獲取狀態(tài),state=1表示鎖已經(jīng)被獲取的狀態(tài),state=-1是創(chuàng)建Worker時(shí)默認(rèn)的狀態(tài),創(chuàng)建時(shí)狀態(tài)設(shè)置為-1是為了避免該線程在運(yùn)行runWorker()方法前被中斷。其中變量firstTask記錄該工作線程執(zhí)行的第一個(gè)任務(wù),thread是具體執(zhí)行任務(wù)的線程。
DefaultThreadFactory是線程工廠,newThread方法是對(duì)線程的一個(gè)修飾。其中poolNumber是個(gè)靜態(tài)的原子變量,用來統(tǒng)計(jì)線程工廠的個(gè)數(shù),threadNumber用來記錄每個(gè)線程工廠創(chuàng)建了多少線程,這兩個(gè)值也作為線程池和線程的名稱的一部分。
源碼分析
public void execute(Runnable command)
execute方法的作用是提交任務(wù)command到線程池進(jìn)行執(zhí)行。用戶線程提交任務(wù)到線程池的模型圖如下圖所示。
從該圖可以看出,ThreadPoolExecutor的實(shí)現(xiàn)實(shí)際是一個(gè)生產(chǎn)消費(fèi)模型,當(dāng)用戶添加任務(wù)到線程池時(shí)相當(dāng)于生產(chǎn)者生產(chǎn)元素,workers線程工作集中的線程直接執(zhí)行任務(wù)或者從任務(wù)隊(duì)列里面獲取任務(wù)時(shí)則相當(dāng)于消費(fèi)者消費(fèi)元素。
用戶線程提交任務(wù)的execute方法的具體代碼如下
public void execute(Runnable command) {// 1 任務(wù)為null ,拋出 npe異常if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/// 2 獲取當(dāng)前線程池的狀態(tài) + 線程個(gè)數(shù)變量的組合值 int c = ctl.get();// 3 當(dāng)前線程池中的個(gè)數(shù)是否小于corePoolSize ,小于的話則開啟新的線程 if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 4 如果線程池處于running狀態(tài),則添加任務(wù)到阻塞隊(duì)列if (isRunning(c) && workQueue.offer(command)) {// 4.1 二次檢查int recheck = ctl.get();// 4.2 如果當(dāng)前線程池的狀態(tài)不是running 則從隊(duì)列中移除任務(wù),并執(zhí)行拒絕策略if (! isRunning(recheck) && remove(command))reject(command);// 4.3 否則當(dāng)線程數(shù)數(shù)量為空,則添加一個(gè)線程 else if (workerCountOf(recheck) == 0)addWorker(null, false);} // 5 如果隊(duì)列滿,則新增線程,新增線程失敗,觸發(fā)拒絕策略 else if (!addWorker(command, false))reject(command);}-
代碼(3)判斷如果當(dāng)前線程池中線程個(gè)數(shù)小于corePoolSize,會(huì)向workers里面新增一個(gè)核心線程(core線程)執(zhí)行該任務(wù)。
-
如果當(dāng)前線程池中線程個(gè)數(shù)大于等于corePoolSize則執(zhí)行代碼(4)。如果當(dāng)前線程池處于RUNNING狀態(tài)則添加當(dāng)前任務(wù)到任務(wù)隊(duì)列。這里需要判斷線程池狀態(tài)是因?yàn)橛锌赡芫€程池已經(jīng)處于非RUNNING狀態(tài),而在非RUNNING狀態(tài)下是要拋棄新任務(wù)的。
-
如果向任務(wù)隊(duì)列添加任務(wù)成功,則代碼(4.2)對(duì)線程池狀態(tài)進(jìn)行二次校驗(yàn),這是因?yàn)樘砑尤蝿?wù)到任務(wù)隊(duì)列后,執(zhí)行代碼(4.2)前有可能線程池的狀態(tài)已經(jīng)變化了。這里進(jìn)行二次校驗(yàn),如果當(dāng)前線程池狀態(tài)不是RUNNING了則把任務(wù)從任務(wù)隊(duì)列移除,移除后執(zhí)行拒絕策略;如果二次校驗(yàn)通過,則執(zhí)行代碼(4.3)重新判斷當(dāng)前線程池里面是否還有線程,如果沒有則新增一個(gè)線程。
-
如果代碼(4)添加任務(wù)失敗,則說明任務(wù)隊(duì)列已滿,那么執(zhí)行代碼(5)嘗試新開啟線程(如上的thread3和thread4)來執(zhí)行該任務(wù),如果當(dāng)前線程池中線程個(gè)數(shù)>maximumPoolSize則執(zhí)行拒絕策略。
新增線程addWorkder源碼分析
/*** Checks if a new worker can be added with respect to current* pool state and the given bound (either core or maximum). If so,* the worker count is adjusted accordingly, and, if possible, a* new worker is created and started, running firstTask as its* first task. This method returns false if the pool is stopped or* eligible to shut down. It also returns false if the thread* factory fails to create a thread when asked. If the thread* creation fails, either due to the thread factory returning* null, or due to an exception (typically OutOfMemoryError in* Thread.start()), we roll back cleanly.** @param firstTask the task the new thread should run first (or* null if none). Workers are created with an initial first task* (in method execute()) to bypass queuing when there are fewer* than corePoolSize threads (in which case we always start one),* or when the queue is full (in which case we must bypass queue).* Initially idle threads are usually created via* prestartCoreThread or to replace other dying workers.** @param core if true use corePoolSize as bound, else* maximumPoolSize. (A boolean indicator is used here rather than a* value to ensure reads of fresh values after checking other pool* state).* @return true if successful*/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary. // 6 檢查隊(duì)列是否只在必要的時(shí)候?yàn)榭?/span>if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;// 7 循環(huán)CAS增加線程個(gè)數(shù) for (;;) {int wc = workerCountOf(c);// 7.1 如果線程個(gè)數(shù)超過限制 則返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 7.2 cas增加線程個(gè)數(shù),同時(shí)只能有1個(gè)線程成功 if (compareAndIncrementWorkerCount(c))break retry;// 7.3 cas失敗了,則看線程池狀態(tài)是否變化了,變化則跳到外層循環(huán)重試重新獲取線程池狀態(tài),否者內(nèi)層循環(huán)重新cas。c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}// 8 到這里,說明CAS成功了boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 8.1 創(chuàng)建Workerw = new Worker(firstTask);final Thread t = w.thread;if (t != null) {// 8.2 加獨(dú)占鎖,為了workers同步,因?yàn)榭赡芏鄠€(gè)線程調(diào)用了線程池的execute方法。final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.//8.3 重新檢查線程池的狀態(tài),避免在獲取鎖前調(diào)用了shutdown接口int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// 8.4 添加任務(wù)workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}// 8.5 添加成功,則啟動(dòng)線程if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}主要分兩個(gè)部分:
- 第一部分雙重循環(huán)的目的是通過CAS操作增加線程數(shù);
- 第二部分主要是把并發(fā)安全的任務(wù)添加到workers里面,并且啟動(dòng)任務(wù)執(zhí)行。
首先來分析第一部分的代碼6
// 6 檢查隊(duì)列是否只在必要的時(shí)候?yàn)榭?/span>if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;展開!運(yùn)算后等價(jià)于
rs >= SHUTDOWN &&(rs != SHUTDOWN ||firstTask != null ||workQueue.isEmpty())也就是說下面幾種情況下會(huì)返回false:
- 當(dāng)前線程池狀態(tài)為STOP,TIDYING,TERMINATED
- 當(dāng)前線程池狀態(tài)為SHUTDOWN并且已經(jīng)有了第一個(gè)任務(wù)
- 當(dāng)前線程池狀態(tài)為SHUTDOWN并且任務(wù)隊(duì)列為空
內(nèi)層循環(huán)的作用是使用CAS操作增加線程數(shù),代碼(7.1)判斷如果線程個(gè)數(shù)超限則返回false,否則執(zhí)行代碼(7.2)CAS操作設(shè)置線程個(gè)數(shù),CAS成功則退出雙循環(huán),CAS失敗則執(zhí)行代碼(7.3)看當(dāng)前線程池的狀態(tài)是否變化了,如果變了,則再次進(jìn)入外層循環(huán)重新獲取線程池狀態(tài),否則進(jìn)入內(nèi)層循環(huán)繼續(xù)進(jìn)行CAS嘗試。
執(zhí)行到第二部分的代碼(8)時(shí)說明使用CAS成功地增加了線程個(gè)數(shù),但是現(xiàn)在任務(wù)還沒開始執(zhí)行。這里使用全局的獨(dú)占鎖來控制把新增的Worker添加到工作集workers中。代碼(8.1)創(chuàng)建了一個(gè)工作線程Worker。
代碼(8.2)獲取了獨(dú)占鎖,代碼(8.3)重新檢查線程池狀態(tài),這是為了避免在獲取鎖前其他線程調(diào)用了shutdown關(guān)閉了線程池。如果線程池已經(jīng)被關(guān)閉,則釋放鎖,新增線程失敗,否則執(zhí)行代碼(8.4)添加工作線程到線程工作集,然后釋放鎖。代碼(8.5)判斷如果新增工作線程成功,則啟動(dòng)工作線程。
工作線程Worker的執(zhí)行
用戶線程提交任務(wù)到線程池后,由Worker來執(zhí)行。先看下Worker的構(gòu)造函數(shù)。
/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;//創(chuàng)建一個(gè)線程this.thread = getThreadFactory().newThread(this);}在構(gòu)造函數(shù)內(nèi)首先設(shè)置Worker的狀態(tài)為-1,這是為了避免當(dāng)前Worker在調(diào)用runWorker方法前被中斷(當(dāng)其他線程調(diào)用了線程池的shutdownNow時(shí),如果Worker狀態(tài)>=0則會(huì)中斷該線程)。這里設(shè)置了線程的狀態(tài)為-1,所以該線程就不會(huì)被中斷了。在如下runWorker代碼中,運(yùn)行代碼(9)時(shí)會(huì)調(diào)用unlock方法,該方法把status設(shè)置為了0,所以這時(shí)候調(diào)用shutdownNow會(huì)中斷Worker線程。
/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);} final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;// 9 將state 置為0 ,允許終端w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 10 while (task != null || (task = getTask()) != null) {// 10.1 w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 10.2 執(zhí)行任務(wù)前干一些事情beforeExecute(wt, task);Throwable thrown = null;try {// 10.3 執(zhí)行任務(wù)task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 10.4 執(zhí)行任務(wù)完成后干一些事情afterExecute(task, thrown);}} finally {task = null;// 10.5 統(tǒng)計(jì)當(dāng)前Worker完成了多少任務(wù)w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {// 11 執(zhí)行清理工作 processWorkerExit(w, completedAbruptly);}}-
在如上代碼(10)中,如果當(dāng)前task==null或者調(diào)用getTask從任務(wù)隊(duì)列獲取的任務(wù)返回null,則跳轉(zhuǎn)到代碼(11)執(zhí)行。
-
如果task不為null則執(zhí)行代碼(10.1)獲取工作線程內(nèi)部持有的獨(dú)占鎖,然后執(zhí)行擴(kuò)展接口代碼(10.2)在具體任務(wù)執(zhí)行前做一些事情。代碼(10.3)具體執(zhí)行任務(wù),代碼(10.4)在任務(wù)執(zhí)行完畢后做一些事情,代碼(10.5)統(tǒng)計(jì)當(dāng)前Worker完成了多少個(gè)任務(wù),并釋放鎖。
-
這里在執(zhí)行具體任務(wù)期間加鎖,是為了避免在任務(wù)運(yùn)行期間,其他線程調(diào)用了shutdown后正在執(zhí)行的任務(wù)被中斷(shutdown只會(huì)中斷當(dāng)前被阻塞掛起的線程)
getTask()
如果當(dāng)前task為空,則直接執(zhí)行,否者調(diào)用getTask從任務(wù)隊(duì)列獲取一個(gè)任務(wù)執(zhí)行,如果任務(wù)隊(duì)列為空,則worker退出。
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 如果當(dāng)前線程池狀態(tài)>=STOP 或者線程池狀態(tài)為shutdown并且工作隊(duì)列為空則,減少工作線程個(gè)數(shù)if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}boolean timed; // Are workers subject to culling?for (;;) {int wc = workerCountOf(c);timed = allowCoreThreadTimeOut || wc > corePoolSize;if (wc <= maximumPoolSize && ! (timedOut && timed))break;if (compareAndDecrementWorkerCount(c))return null;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}try {//根據(jù)timed選擇調(diào)用poll還是阻塞的takeRunnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}} }processWorkerExit
代碼(11)執(zhí)行清理任務(wù),其代碼如下。
/*** Performs cleanup and bookkeeping for a dying worker. Called* only from worker threads. Unless completedAbruptly is set,* assumes that workerCount has already been adjusted to account* for exit. This method removes thread from worker set, and* possibly terminates the pool or replaces the worker if either* it exited due to user task exception or if fewer than* corePoolSize workers are running or queue is non-empty but* there are no workers.** @param w the worker* @param completedAbruptly if the worker died due to user exception*/private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();//統(tǒng)計(jì)整個(gè)線程池完成的任務(wù)個(gè)數(shù)final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}//嘗試設(shè)置線程池狀態(tài)為TERMINATED,如果當(dāng)前是shutdonw狀態(tài)并且工作隊(duì)列為空//或者當(dāng)前是stop狀態(tài)當(dāng)前線程池里面沒有活動(dòng)線程tryTerminate();//如果當(dāng)前線程個(gè)數(shù)小于核心個(gè)數(shù),則增加int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);} }shutdown
調(diào)用shutdown后,線程池就不會(huì)在接受新的任務(wù)了,但是工作隊(duì)列里面的任務(wù)還是要執(zhí)行的,但是該方法立刻返回的,并不等待隊(duì)列任務(wù)完成在返回。
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess(); // 12 advanceRunState(SHUTDOWN);// 13 interruptIdleWorkers();// 14 onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();// 15 }-
代碼(12)檢查看是否設(shè)置了安全管理器,是則看當(dāng)前調(diào)用shutdown命令的線程是否有關(guān)閉線程的權(quán)限,如果有權(quán)限則還要看調(diào)用線程是否有中斷工作線程的權(quán)限,如果沒有權(quán)限則拋出SecurityException或者NullPointerException異常。
-
其中代碼(13)的內(nèi)容如下,如果當(dāng)前線程池狀態(tài)>=SHUTDOWN則直接返回,否則設(shè)置為SHUTDOWN狀態(tài)。
- 代碼(14)的內(nèi)容如下,其設(shè)置所有空閑線程的中斷標(biāo)志。這里首先加了全局鎖,同時(shí)只有一個(gè)線程可以調(diào)用shutdown方法設(shè)置中斷標(biāo)志。然后嘗試獲取Worker自己的鎖,獲取成功則設(shè)置中斷標(biāo)志。由于正在執(zhí)行的任務(wù)已經(jīng)獲取了鎖,所以正在執(zhí)行的任務(wù)沒有被中斷。這里中斷的是阻塞到getTask()方法并企圖從隊(duì)列里面獲取任務(wù)的線程,也就是空閑線程。
在如上代碼中,首先使用CAS設(shè)置當(dāng)前線程池狀態(tài)為TIDYING,如果設(shè)置成功則執(zhí)行擴(kuò)展接口terminated在線程池狀態(tài)變?yōu)門ERMINATED前做一些事情,然后設(shè)置當(dāng)前線程池狀態(tài)為TERMINATED。最后調(diào)用 termination.signalAll()激活因調(diào)用條件變量termination的await系列方法而被阻塞的所有線程
shutdownNow
調(diào)用shutdownNow方法后,線程池就不會(huì)再接受新的任務(wù)了,并且會(huì)丟棄工作隊(duì)列里面的任務(wù),正在執(zhí)行的任務(wù)會(huì)被中斷,該方法會(huì)立刻返回,并不等待激活的任務(wù)執(zhí)行完成。返回值為這時(shí)候隊(duì)列里面被丟棄的任務(wù)列表。
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess(); // 16advanceRunState(STOP);// 17 interruptWorkers();//18 tasks = drainQueue();//19 } finally {mainLock.unlock();}tryTerminate();return tasks;}在如上代碼中,首先調(diào)用代碼(16)檢查權(quán)限,然后調(diào)用代碼(17)設(shè)置當(dāng)前線程池狀態(tài)為STOP,隨后執(zhí)行代碼(18)中斷所有的工作線程。這里需要注意的是,中斷的所有線程包含空閑線程和正在執(zhí)行任務(wù)的線程。
private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();}}然后代碼(19)將當(dāng)前任務(wù)隊(duì)列里面的任務(wù)移動(dòng)到tasks列表。
awaitTermination
等待線程池狀態(tài)變?yōu)門ERMINATED則返回,或者時(shí)間超時(shí)。由于整個(gè)過程獨(dú)占鎖,所以一般調(diào)用shutdown或者shutdownNow后使用。
public boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (;;) {if (runStateAtLeast(ctl.get(), TERMINATED))return true;if (nanos <= 0)return false;nanos = termination.awaitNanos(nanos);}} finally {mainLock.unlock();}}-
如上代碼首先獲取獨(dú)占鎖,然后在無限循環(huán)內(nèi)部判斷當(dāng)前線程池狀態(tài)是否至少是TERMINATED狀態(tài),如果是則直接返回,否則說明當(dāng)前線程池里面還有線程在執(zhí)行 ,則看設(shè)置的超時(shí)時(shí)間nanos是否小于0,小于0則說明不需要等待,那就直接返回,如果大于0則調(diào)用條件變量termination的awaitNanos方法等待nanos時(shí)間,期望在這段時(shí)間內(nèi)線程池狀態(tài)變?yōu)門ERMINATED。
-
在shutdown方法時(shí)提到過,當(dāng)線程池狀態(tài)變?yōu)門ERMINATED時(shí),會(huì)調(diào)用termination.signalAll()用來激活調(diào)用條件變量termination的await系列方法被阻塞的所有線程,所以如果在調(diào)用awaitTermination之后又調(diào)用了shutdown方法,并且在shutdown內(nèi)部將線程池狀態(tài)設(shè)置為TERMINATED,則termination.awaitNanos方法會(huì)返回。
-
另外在工作線程Worker的runWorker方法內(nèi),當(dāng)工作線程運(yùn)行結(jié)束后,會(huì)調(diào)用processWorkerExit方法,在processWorkerExit方法內(nèi)部也會(huì)調(diào)用tryTerminate方法測(cè)試當(dāng)前是否應(yīng)該把線程池狀態(tài)設(shè)置為TERMINATED,如果是,則也會(huì)調(diào)用termination.signalAll()用來激活調(diào)用線程池的awaitTermination方法而被阻塞的線程。
-
而且當(dāng)?shù)却龝r(shí)間超時(shí)后,termination.awaitNanos也會(huì)返回,這時(shí)候會(huì)重新檢查當(dāng)前線程池狀態(tài)是否為TERMINATED,如果是則直接返回,否則繼續(xù)阻塞掛起自己。
小結(jié)
線程池巧妙地使用一個(gè)Integer類型的原子變量來記錄線程池狀態(tài)和線程池中的線程個(gè)數(shù)。通過線程池狀態(tài)來控制任務(wù)的執(zhí)行,每個(gè)Worker線程可以處理多個(gè)任務(wù)。線程池通過線程的復(fù)用減少了線程創(chuàng)建和銷毀的開銷。
總結(jié)
以上是生活随笔為你收集整理的Java Review - 并发编程_ThreadPoolExecutor原理源码剖析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java Review - 并发编程_D
- 下一篇: Java Review - 并发编程_S