【多线程】ThreadPoolExecutor类万字源码解析(注解超级详细)
線程池
線程池初始化時是沒有創建線程的,線程池里的線程的初始化與其他線程一樣,但是在完成任務以后,該線程不會自行銷毀,而是以掛起的狀態返回到線程池。直到應用程序再次向線程池發出請求時,線程池里掛起的線程就會再度激活執行任務。這樣既節省了建立線程所造成的性能損耗,也可以讓多個任務反復重用同一線程,從而在應用程序生存期內節約大量開銷
public ThreadPoolExecutor(// 線程池核心線程數int corePoolSize, // 線程池最大數int maximumPoolSize, // 空閑線程存活時間long keepAliveTime, // 時間單位TimeUnit unit,// 線程池所使用的緩沖隊列BlockingQueue<Runnable> workQueue,// 線程池創建線程使用的工廠ThreadFactory threadFactory,// 線程池對拒絕任務的處理策略RejectedExecutionHandler handler)ThreadPoolExecutor 源碼分析
線程池狀態
ThreadPoolExecutor有一個AtomicInteger變量,叫ctl(control的簡寫),一共32位,高3位存線程池狀態runState(一共5種狀態:Running,Shutdown,Stop,Tidying,Terminate),低29位存當前有效線程數workerCount
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//COUNT_BITS = 29private static final int COUNT_BITS = Integer.SIZE - 3;// 線程池最大線程數 = 536870911(2^29-1)// 將 1 的二進制向右位移 29 位,再減 1 表示最大線程容量// CAPACITY :00011111111111111111111111111111// 高3位000,低29位全為1private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits// 運行狀態保存在 int 值的高 3 位 (所有數值左移 29 位)// RUNNING :11100000000000000000000000000000// 高3位111,低29位全為0private static final int RUNNING = -1 << COUNT_BITS;// SHUTDOWN :00000000000000000000000000000000// 高3位000,低29位全為0private static final int SHUTDOWN = 0 << COUNT_BITS;// STOP :00100000000000000000000000000000// 高3位001,低29位全為0private static final int STOP = 1 << COUNT_BITS;// TIDYING :01000000000000000000000000000000// 高3位010,低29位全為0private static final int TIDYING = 2 << COUNT_BITS;// TERMINATED:01100000000000000000000000000000// 高3位011,低29位全為0private static final int TERMINATED = 3 << COUNT_BITS;分析:
事實上COUNT_BITS = 29,而上面的5重線程狀態實際上是使用32位中的高3位來表示,低29位存線程數,這樣線程池的狀態和線程數量就由一個變量存儲,即:
- RUNNING = 111: 線程池正常運行,可以接受新的任務并處理隊列中的任務。
- SHUTDOWN = 000:關閉狀態,不再接受新的任務,但是會執行隊列中的任務。在線程池處于 RUNNING 狀態時,調用 shutdown()方法會使線程池進入到該狀態。(finalize() 方法在執行過程中也會調用shutdown()方法進入該狀態);
- STOP = 001:不再接受新任務,不處理隊列中的任務,中斷進行中的任務。在線程池處于 RUNNING 或 SHUTDOWN 狀態時,調用 shutdownNow() 方法會使線程池進入到該狀態;
- TIDYING = 010:所有任務已經終止,workerCount為0,線程狀態轉換到TIDYING,線程池進入該狀態后會調用 terminated() 方法進入TERMINATED 狀態。
- TERMINATED = 011:terminate()函數執行完成后進入該狀態。
execute 方法
public void execute(Runnable command) {/*** 源代碼閱讀說明:* 1、這里的command就是task;因為這里使用了命令模式,所以用command命名;執行task,task被包裝成了一個work。* 2、isRunning檢查當前ThreadPoolExecutor是否是運行狀態* 3、workerCountOf會根據ThreadPoolExecutor的狀態值獲得當前正的正在執行的和等待的work數量。* 這里先不用深究isRunning和workerCounterOf方法的實現。* 4、addWork方法會新增一個線程,返回true表示新增成功,返回false表示新增失敗*/if (command == null)// 如果任務為null,則拋出 NullPointerException 異常throw new NullPointerException();// 獲取當前線程池的狀態 + 線程個數變量的組合值int c = ctl.get();// 當前線程池線程個數是否小于 corePoolSize,小于則開啟新線程運行if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 如果線程池處于RUNNING狀態,核心池已滿,但任務隊列未滿,則添加任務到阻塞隊列if (isRunning(c) && workQueue.offer(command)) {// 任務成功添加到隊列以后,再次檢查是否需要添加新的線程,因為已存在的線程可能被銷毀了int recheck = ctl.get();// 如果當前線程池狀態不是RUNNING則從隊列刪除任務,并執行拒絕策略if (! isRunning(recheck) && remove(command))reject(command);// 否者如果當前線程池線程空,則添加一個線程else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 核心池已滿,隊列已滿,試著創建一個新線程else if (!addWorker(command, false))// 如果創建新線程失敗了,說明線程池被關閉或者線程池完全滿了,拒絕任務reject(command);}execute 方法 執行流程
1、首先判斷任務是否為空,空則拋出空指針異常
2、不為空則獲取線程池控制狀態,判斷小于corePoolSize(核心線程),添加到worker集合當中執行,
- 如成功,則返回
- 失敗的話再接著獲取線程池控制狀態,因為只有狀態變了才會失敗,所以重新獲取
3、判斷線程池是否處于運行狀態,是的話則添加 command 到阻塞隊列,加入時也會再次獲取狀態并且檢測狀態是否不處于運行狀態,不處于的話則將 command 從阻塞隊列移除,并且拒絕任務
4、如果線程池里沒有了線程,則創建新的線程去執行獲取阻塞隊列的任務執行
5、如果以上都沒執行成功,則需要開啟最大線程池里的線程來執行任務,失敗的話就丟棄
addWorker方法
如果工作線程數小于核心線程數的話,會調用 addWorker ,顧名思義,其實就是要創建一個工作線程。我們來看看源碼的實現
源碼比較長,其實就做了兩件事。
- 才用循環 CAS 操作來將線程數加 1;
- 新建一個線程并啟用。
addWorker方法 執行流程
1、獲取線程池的控制狀態,進行判斷,不符合則返回false,符合則下一步
2、死循環,判斷workerCount是否大于上限,或者大于corePoolSize/maximumPoolSize,沒有的話則對workerCount+1操作,
3、如果不符合上述判斷或+1操作失敗,再次獲取線程池的控制狀態,獲取runState與剛開始獲取的runState相比,不一致則跳出內層循環繼續外層循環,否則繼續內層循環
4、+1操作成功后,使用重入鎖ReentrantLock來保證往workers當中添加worker實例,添加成功就啟動該實例。
Worker內部類
可以發現 addWorker 方法只是構造了一個 Worker,并且把 firstTask 封裝到 worker 中,它是做什么的呢?我們來瞧瞧
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;/** * Thread this worker is running in. Null if factory fails. * 此工作線程正在運行的線程。如果工廠失敗,則為null。*/// 這才是真正執行 task 的線程,從構造函數可知是由ThreadFactury 創建的final Thread thread;/** * Initial task to run. Possibly null. * 要運行的初始任務。可能為空*/// 這就是需要執行的 taskRunnable firstTask;/** * Per-thread task counter * 每線程任務計數器*/// 完成的任務數,用于線程池統計volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*//*** 創建并初始化第一個任務,使用線程工廠來創建線程* 初始化有3步* 1、設置AQS的同步狀態為-1,表示該對象需要被喚醒* 2、初始化第一個任務* 3、調用ThreadFactory來使自身創建一個線程,并賦值給worker的成員變量thread*/Worker(Runnable firstTask) {// 初始狀態 -1,防止在調用 runWorker(),也就是真正執行 task前中斷 thread。setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;// 創建一個線程this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */// 重寫Runnable的run方法public void run() {// 調用ThreadPoolExecutor的runWorker方法runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.// 代表是否獨占鎖,0-非獨占 1-獨占protected boolean isHeldExclusively() {return getState() != 0;}// 重寫AQS的tryAcquire方法嘗試獲取鎖protected boolean tryAcquire(int unused) {// 嘗試將AQS的同步狀態從0改為1if (compareAndSetState(0, 1)) {// 如果改變成,則將當前獨占模式的線程設置為當前線程并返回truesetExclusiveOwnerThread(Thread.currentThread());// 成功獲得鎖return true;}// 線程進入等待隊列return false;}// 重寫AQS的tryRelease嘗試釋放鎖protected boolean tryRelease(int unused) {// 設置當前獨占模式的線程為nullsetExclusiveOwnerThread(null);// 設置AQS同步狀態為0setState(0);// 返回truereturn true;}// 獲取鎖public void lock() { acquire(1); }// 嘗試獲取鎖public boolean tryLock() { return tryAcquire(1); }// 釋放鎖public void unlock() { release(1); }// 是否被獨占public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}addWorkerFailed方法
addWorker方法添加worker失敗,并且沒有成功啟動任務的時候,就會調用此方法,將任務從workers中移除,并且workerCount做-1操作。
private void addWorkerFailed(Worker w) {// 重入鎖final ReentrantLock mainLock = this.mainLock;// 獲取鎖mainLock.lock();try {// 如果worker不為nullif (w != null)// workers移除workerworkers.remove(w);// 通過CAS操作,workerCount-1decrementWorkerCount();tryTerminate();} finally {// 釋放鎖mainLock.unlock();}}tryTerminate方法
當對線程池執行了非正常成功邏輯的操作時,都會需要執行tryTerminate嘗試終止線程池
final void tryTerminate() {// 死循環for (;;) {// 獲取線程池控制狀態int c = ctl.get();/** 線程池處于RUNNING狀態* 線程池狀態最小大于TIDYING* 線程池==SHUTDOWN并且workQUeue不為空* 直接return,不能終止*/if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// 如果workerCount不為0if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}// 獲取線程池的鎖final ReentrantLock mainLock = this.mainLock;// 獲取鎖mainLock.lock();try {// 通過CAS操作,設置線程池狀態為TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {// 設置線程池的狀態為TERMINATEDctl.set(ctlOf(TERMINATED, 0));// 發送釋放信號給在termination條件上等待的線程termination.signalAll();}return;}} finally {// 釋放鎖mainLock.unlock();}// else retry on failed CAS}}runWorker方法
該方法的作用就是去執行任務
final void runWorker(Worker w) {// 獲取當前線程Thread wt = Thread.currentThread();// 獲取worker里的任務Runnable task = w.firstTask;// 將worker實例的任務賦值為nullw.firstTask = null;/*** unlock方法會調用AQS的release方法* release方法會調用具體實現類也就是Worker的tryRelease方法* 也就是將AQS狀態置為0,允許中斷*/w.unlock(); // allow interrupts// 是否突然完成boolean completedAbruptly = true;try {// worker實例的task不為空,或者通過getTask獲取的不為空while (task != null || (task = getTask()) != null) {// 獲取鎖w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt/*** 獲取線程池的控制狀態,至少要大于STOP狀態* 如果狀態不對,檢查當前線程是否中斷并清除中斷狀態,并且再次檢查線程池狀態是否大于STOP* 如果上述滿足,檢查該對象是否處于中斷狀態,不清除中斷標記*/if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())// 中斷改對象wt.interrupt();try {// 執行前的方法,由子類具體實現beforeExecute(wt, task);Throwable thrown = null;try {// 執行任務task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 執行完后調用的方法,也是由子類具體實現afterExecute(task, thrown);}} finally { // 執行完后// task設置為nulltask = null;// 已完成任務數+1w.completedTasks++;// 釋放鎖w.unlock();}}completedAbruptly = false;} finally {// 處理并退出當前workerprocessWorkerExit(w, completedAbruptly);}}runWorker方法 執行流程
1,首先在方法一進來,就執行了w.unlock(),這是為了將AQS的狀態改為0,因為只有getState() >= 0的時候,線程才可以被中斷;
2,判斷firstTask是否為空,為空則通過getTask()獲取任務,不為空接著往下執行
3,判斷是否符合中斷狀態,符合的話設置中斷標記
4,執行beforeExecute(),task.run(),afterExecute()方法
5,任何一個出異常都會導致任務執行的終止;進入processWorkerExit來退出任務
6,正常執行的話會接著回到步驟2
getTask方法
在上面的runWorker方法當中我們可以看出,當firstTask為空的時候,會通過該方法來接著獲取任務去執行,那我們就看看獲取任務這個方法到底是怎么樣的?
private Runnable getTask() {// 標志是否獲取任務超時boolean timedOut = false; // Did the last poll() time out?for (;;) {// 獲取線程池的控制狀態int c = ctl.get();// 獲取線程池的runStateint rs = runStateOf(c);// Check if queue empty only if necessary./*** 判斷線程池的狀態,出現以下兩種情況* 1、runState大于等于SHUTDOWN狀態* 2、runState大于等于STOP或者阻塞隊列為空* 將會通過CAS操作,進行workerCount-1并返回null*/if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}// 獲取線程池的workerCountint wc = workerCountOf(c);// Are workers subject to culling?/*** allowCoreThreadTimeOut:是否允許core Thread超時,默認false* workerCount是否大于核心核心線程池*/boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;/*** 1、wc大于maximumPoolSize或者已超時* 2、隊列不為空時保證至少有一個任務*/if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {/*** 通過CAS操作,workerCount-1* 能進行-1操作,證明wc大于maximumPoolSize或者已經超時*/if (compareAndDecrementWorkerCount(c))// -1操作成功,返回nullreturn null;// -1操作失敗,繼續循環continue;}try {/*** wc大于核心線程池* 執行poll方法* 小于核心線程池* 執行take方法*/Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();// 判斷任務不為空返回任務if (r != null)return r;// 獲取一段時間沒有獲取到,獲取超時timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}processWorkerExit方法
明顯的,在執行任務當中,會去獲取任務進行執行,那既然是執行任務,肯定就會有執行完或者出現異常中斷執行的時候,那這時候肯定也會有相對應的操作,至于具體操作是怎么樣的,我們還是直接去看源碼最實際。
private void processWorkerExit(Worker w, boolean completedAbruptly) {/*** completedAbruptly:在runWorker出現,代表是否突然完成的意思* 也就是在執行任務過程當中出現異常,就會突然完成,傳true** 如果是突然完成,需要通過CAS操作,workerCount-1* 不是突然完成,則不需要-1,因為getTask方法當中已經-1** 下面的代碼注釋貌似與代碼意思相反了*/if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();// 生成重入鎖final ReentrantLock mainLock = this.mainLock;// 獲取鎖mainLock.lock();try {// 線程池統計的完成任務數completedTaskCount加上worker當中完成的任務數completedTaskCount += w.completedTasks;// 從HashSet<Worker>中移除workers.remove(w);} finally {// 釋放鎖mainLock.unlock();}// 因為上述操作是釋放任務或線程,所以會判斷線程池狀態,嘗試終止線程池tryTerminate();// 獲取線程池的控制狀態int c = ctl.get();// 判斷runState是否小于STOP,即是RUNNING或者SHUTDOWN// 如果是RUNNING或者SHUTDOWN,代表沒有成功終止線程池if (runStateLessThan(c, STOP)) {/** 是否突然完成* 如若不是,代表已經沒有任務可獲取完成,因為getTask當中是while循環*/if (!completedAbruptly) {/*** allowCoreThreadTimeOut:是否允許core thread超時,默認false* min-默認是corePoolSize*/int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 允許core thread超時并且隊列不為空// min為0,即允許core thread超時,這樣就不需要維護核心核心線程池了// 如果workQueue不為空,則至少保持一個線程存活if (min == 0 && ! workQueue.isEmpty())min = 1;// 如果workerCount大于min,則表示滿足所需,可以直接返回if (workerCountOf(c) >= min)return; // replacement not needed}// 如果是突然完成,添加一個空任務的worker線程addWorker(null, false);}}總結
在文章的最后,我們先對 ThreadPoolExecutor 的關鍵信息做一些總結:
線程池解決兩個問題:
- 通過減少任務間的調度開銷 (主要是通過線程池中的線程被重復使用的方式),來提高大量任務時的執行性能
- 提供了一種方式來管理線程和消費,維護基本數據統計等工作,比如統計已完成的任務數;
線程池容量相關參數:
- coreSize:當新任務提交時,發現運行的線程數小于 coreSize,一個新的線程將被創建,即使這時候其它工作線程是空閑的,可以通過 getCorePoolSize 方法獲得 coreSize
- maxSize: 當任務提交時,coreSize < 運行線程數 <= maxSize,但隊列沒有滿時,任務提交到隊列中,如果隊列滿了,在 maxSize 允許的范圍內新建線程;
- 一般來說,coreSize 和 maxSize 在線程池初始化時就已經設定了,但我們也可以通過 setCorePoolSize、setMaximumPoolSize 方法動態的修改這兩個值;
Keep-alive times 參數:
- 作用: 如果當前線程池中有超過 coreSize 的線程,并且線程空閑的時間超過 keepAliveTime,當前線程就會被回收,這樣可以避免線程沒有被使用時的資源浪費;
- 通過 setKeepAliveTime 方法可以動態的設置 keepAliveTime 的值;
- 如果設置 allowCoreThreadTimeOut 為 ture 的話,core thread 空閑時間超過 keepAliveTime 的話,也會被回收;
線程池新建時的隊列選擇有很多,比如:
- ArrayBlockingQueue,有界隊列,可以防止資源被耗盡;
- LinkedBlockingQueue,無界隊列,未消費的任務可以在隊列中等待
- SynchronousQueue,為了避免任務被拒絕,要求線程池的 maxSize 無界,缺點是當任務提交的速度超過消費的速度時,可能出現無限制的線程增長
拒絕策略:在 Executor 已經關閉或對最大線程和最大隊列都使用飽和時,可以使用 RejectedExecutionHandler 類進行異常捕捉。有如下四種處理策略:
- AbortPolicy(默認):拋出異常
- CallerRunsPolicy:不使用線程池,主線程來執行
- DiscardPolicy:直接丟棄任務
- DiscardOldestPolicy:丟棄隊列中最老任務
ExecutorService 使用線程池中的線程執行提交的任務,線程池我們可以使用 Executors 進行配置.Executors 為常用的場景設定了可直接初始化線程池的方法,比如:
- Executors#newCachedThreadPool 無界的線程池,并且可以自動回收
- Executors#newFixedThreadPool 固定大小線程池
- Executors#newSingleThreadExecutor 單個線程的線程池;
另外,線程池提供了很多可供擴展的鉤子函數,比如有:
- 提供在每個任務執行之前 beforeExecute 和執行之后 afterExecute 的鉤子方法,主要用于操作執行環境,比如初始化 ThreadLocals、收集統計數據、添加日志條目等
- 如果在執行器執行完成之后想干一些事情,可以實現 terminated 方法,如果鉤子方法執行時發生異常,工作線程可能會失敗并立即終止。
總結
以上是生活随笔為你收集整理的【多线程】ThreadPoolExecutor类万字源码解析(注解超级详细)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【多线程】ThreadPoolExecu
- 下一篇: 【多线程】ThreadPoolExecu