ThreadPoolExecutor的execute源码分析
上一篇文章指出,ThreadPoolExecutor執行的步驟如下:
向線程池中添加任務,當任務數量少于corePoolSize時,會自動創建thead來處理這些任務;
當添加任務數大于corePoolSize且少于maximmPoolSize時,不再創建線程,而是將這些任務放到阻塞隊列中,等待被執行;
接上面2的條件,且當阻塞隊列滿了之后,繼續創建thread,從而加速處理阻塞隊列;
當添加任務大于maximmPoolSize時,根據飽和策略決定是否容許繼續向線程池中添加任務,默認的飽和策略是AbortPolicy(直接丟棄)。
我們直接可以通過ThreadPoolExecutor的execute方法源碼來跟蹤這個流程。首先,由于在execute方法中常常會根據線程池的狀態選擇判斷一些邏輯,因此在介紹該方法之前首先說一下線程池的幾種方法。
1. 線程池的狀態:
RUNNING:該狀態的線程池會接收新任務,也會處理在阻塞隊列中等待處理的任務;
SHUTDOWN:該狀態的線程池不會再接收新任務,但還會處理已經提交到阻塞隊列中等待處理的任務;
STOP:該狀態的線程池不會再接收新任務,不會處理在阻塞隊列中等待的任務,而且還會中斷正在運行的任務;
TIDYING:所有任務都被終止了,workerCount為0,為此狀態時還將調用terminated()方法;
TERMINATED:terminated()方法調用完成后變成此狀態。
幾個狀態相關的方法:
runStateOf(int c)?方法:c & 高3位為1,低29位為0的~CAPACITY,用于獲取高3位保存的線程池狀態
workerCountOf(int c)?方法:c & 高3位為0,低29位為1的CAPACITY,用于獲取低29位的線程數量
ctlOf(int rs, int wc)?方法:參數rs表示runState,參數wc表示workerCount,即根據runState和workerCount打包合并成ctl
也就是說32位含義:(高三位表示狀態)+ (低29位表示線程數量)。
接下來分析源碼:
?
2. execute代碼
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. 如果運行的線程少于corePoolSize, * 嘗試開啟一個新線程去運行command,command作為這個線程的第一個任務,并運行 * * 2. 如果任務成功放入隊列,我們仍需要一個雙重校驗去確認是否應該新建一個線程 *(因為可能存在有些線程在我們上次檢查后死了),或者進入這個方法后,pool被關閉了 * 所以我們需要再次檢查state,如果線程池停止了需要回滾入隊列, * 如果池中沒有線程了,新開啟 一個線程 * * 3. 如果無法將任務入隊列(可能隊列滿了),需要新開區一個線程 * 如果失敗了,說明線程池shutdown或者飽和了,所以我們拒絕任務 */ // 1.當運行的線程少于corePoolSize, // 則直接執行command任務,addworker(command,true)會產生一個新線程來執行這個任務 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 2. 線程池處于RUNNING狀態,并將任務放入workQueue隊列,但不執行addWorker(表明不創建新的線程) // 雙重校驗可防止添加任務到workQueue隊列后,線程池狀態由于意外等原因處于非RUNNING狀態, // 此時就需要從workQueue隊列remove掉這個任務 // 注:offer方法不會阻塞,如果不能插入隊列直接返回false。(有可能造成數據丟失?這里不會,也就是說阻塞隊列滿了) if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3. 如果線程池不是running狀態或者無法入隊列,執行線程池的飽和策略 else if (!addWorker(command, false)) reject(command); }從上面代碼可知,java線程池在任務比較少時(當運行的線程少于corePoolSize),直接通過addWorker來執行任務,當任務比較多時,使用了阻塞隊列,阻塞隊列里存放的是Worker對象,Worker類是ThreadPoolExecutor的一個內部類,它實現了Runable接口,具有線程的功能。同時還繼承了AbstractQueuedSynchronizer(AQS),因此也具有鎖的功能。那么ThreadPoolExecutor中如何去執行阻塞隊列里面的Worker任務的呢?首先我們來分析一下doWorker,看它是如何執行任務,以及如何觸發執行阻塞隊列里面的任務的。
?
3. doWorker代碼
doWorker的的作用首先是創建線程,然后執行任務,源碼如下:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 獲取線程池運行狀態, // 線程池的運行狀態:runnbale=-1,shutdown=0,stop=1,tidying=2,terminated=3 int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // CAS算法 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 如果添加任務成功,則跳出retry,也就是跳出整個循環體 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); // 通過線程池的ThreadFactory創建一個線程,用于執行這個firstTask任務 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // 說明:(rs == SHUTDOWN && firstTask == null)可能是workQueue中仍有未執行完成的任務,// 創建沒有初始任務的worker線程執行 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 提前檢查t線程是否啟動,如果是就拋非法線程狀態異常 if (t.isAlive()) throw new IllegalThreadStateException(); // workQueue隊列中添加Worker對象 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 往HashSet中添加worker成功,啟動線程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
代碼看起來有點長,但只做了兩件事:
1)用循環CAS操作來將線程數加1;
2)新建一個線程并啟執行這個任務。
代碼中使用的retry,它類似與goto, 用于控制跳出循環體,retry可以隨意命名,只要遵循java的命名規則即可。
CAS會使用循環機制,當存在多線程的情況下,通過比較與交換,其它線程通過循環可以的更新最新值。關于CAS可以參考《深入淺出CAS》
在上面源碼中可以看到,addWorker會用當前firstTask創建一個Worker對象,相當于對firstTask的包裝,然后用Worker對象作為firstTask創建一個Thread,該Thread保存在Worker的thread成員變量中。在addWorker中通過t.start()啟動了這個線程,線程中執行runWorker方法。
?
4. 內部類Worker
那么ThreadPoolExecutor中如何去執行阻塞隊列里面的Worker任務的呢?看到這里好像還是沒有答案。那接著分析Worker這個內部類:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. */ Worker(Runnable firstTask) { // 設置AQS的同步狀態,大于0代表鎖已經被獲取 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { // 調用ThreadPoolExecutor的runworker方法 runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }在addWorker中通過t.start()啟動了這個線程,線程中執行runWorker方法。
?
5. runWorker代碼
到目前為止還是沒有涉及到阻塞隊列!可是到runWorker中就可以看到啦!
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }上面代碼關鍵點是while循環和getTask()方法,通過循環不斷的調用getTask()從阻塞隊列中獲取任務,通過這個方法,它與阻塞隊列建立橋梁。目前我們已經知道當添加任務數量大于coolPoolSize(且小于maximumPoolSize)的時候,并不會創建線程,但是由于在任務數量小于coolPoolSize之前調用了addWorker并觸發t.star()執行,從而調用了runWorker,通過循環不斷的調用getTask()從阻塞隊列中獲取任務,如果getTask()返回不為null,則上鎖,執行任務,任務執行完成之后解鎖。如果getTask()返回null,改變completedAbrutly狀態,然后調用processWorkerExit() 退出worker線程。
?
6. getTask代碼
由第5點引出了getTask方法。
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }getTask中主要看獲取任務的代碼如下:
6. 小結
本文只是對線程池正常的工作流程進行了分析,并沒有對線程池shutdown或者stop的情況進行分析,這些部分涉及到AQS等并發技術,這部分比較復雜,感興趣可以更加深入研究一下。
參考:
轉載于:https://www.cnblogs.com/chenjunjie12321/p/9515511.html
總結
以上是生活随笔為你收集整理的ThreadPoolExecutor的execute源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python全栈 MongoDB 数据库
- 下一篇: SpringBoot基础入门