Java并发:线程池详解(ThreadPoolExecutor)
前言
現在在實現異步時,基本都是使用線程池來實現,線程池在工作應用的還是比較頻繁的,本文將就線程池的使用、相關原理和主要方法源碼進行深入講解學習。
線程池的基本使用
package com.joonwhee.concurrent;
?
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
?
/**
?* 線程池的基本使用
?* @author JoonWhee
?* @Date 2018年1月21日
?*/
public class ThreadPoolExecutorTest {
?
? ? /**
? ? ?* 創建一個線程池(完整入參):?
? ? ?* 核心線程數為5 (corePoolSize),?
? ? ?* 最大線程數為10 (maximumPoolSize),?
? ? ?* 存活時間為60分鐘(keepAliveTime),?
? ? ?* 工作隊列為LinkedBlockingQueue (workQueue),
? ? ?* 線程工廠為默認的DefaultThreadFactory (threadFactory),?
? ? ?* 飽和策略(拒絕策略)為AbortPolicy: 拋出異常(handler).
? ? ?*/
? ? private static ExecutorService THREAD_POOL = new ThreadPoolExecutor(5, 10, 60, TimeUnit.MINUTES,
? ? ? ? ? ? new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(),
? ? ? ? ? ? new ThreadPoolExecutor.AbortPolicy());
?
? ? /**
? ? ?* 只有一個線程的線程池 沒有超時時間, 工作隊列使用無界的LinkedBlockingQueue
? ? ?*/
? ? private static ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
? ? // private static ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());
?
? ? /**
? ? ?* 有固定線程的線程池(即corePoolSize = maximumPoolSize) 沒有超時時間,
? ? ?* 工作隊列使用無界的LinkedBlockingQueue
? ? ?*/
? ? private static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
? ? // private static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5, Executors.defaultThreadFactory());
?
? ? /**
? ? ?* 大小不限的線程池 核心線程數為0, 最大線程數為Integer.MAX_VALUE, 存活時間為60秒 該線程池可以無限擴展,
? ? ?* 并且當需求降低時會自動收縮, 工作隊列使用同步移交SynchronousQueue.
? ? ?*/
? ? private static ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
? ? // private static ExecutorService cachedThreadPool = Executors.newCachedThreadPool(Executors.defaultThreadFactory());
?
? ? /**
? ? ?* 給定的延遲之后運行任務, 或者定期執行任務的線程池
? ? ?*/
? ? private static ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
? ? // private static ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5, Executors.defaultThreadFactory());
?
? ? public static void main(String args[]) throws Exception {
?
? ? ? ? /**
? ? ? ? ?* 例子1: 沒有返回結果的異步任務
? ? ? ? ?*/
? ? ? ? THREAD_POOL.submit(new Runnable() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public void run() {
? ? ? ? ? ? ? ? // do something
? ? ? ? ? ? ? ? System.out.println("沒有返回結果的異步任務");
? ? ? ? ? ? }
? ? ? ? });
? ? ? ??
? ? ? ? /**
? ? ? ? ?* 例子2: 有返回結果的異步任務
? ? ? ? ?*/
? ? ? ? Future<List<String>> future = THREAD_POOL.submit(new Callable<List<String>>() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public List<String> call() {
? ? ? ? ? ? ? ? List<String> result = new ArrayList<>();
? ? ? ? ? ? ? ? result.add("JoonWhee");
? ? ? ? ? ? ? ? return result;
? ? ? ? ? ? }
? ? ? ? });
? ? ? ? List<String> result = future.get(); // 獲取返回結果
? ? ? ? System.out.println("有返回結果的異步任務: " + result);
? ? ? ??
? ? ? ? /**
? ? ? ? ?* 例子3:?
? ? ? ? ?* 有延遲的, 周期性執行異步任務
? ? ? ? ?* 本例子為: 延遲1秒, 每2秒執行1次
? ? ? ? ?*/
? ? ? ? scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public void run() {
? ? ? ? ? ? ? ? System.out.println("this is " + Thread.currentThread().getName());
? ? ? ? ? ? }
?
? ? ? ? }, 1, 2, TimeUnit.SECONDS);
? ? ? ??
? ? ? ? /**
? ? ? ? ?* 例子4: FutureTask的使用
? ? ? ? ?*/
? ? ? ? Callable<String> task = new Callable<String>() {
? ? ? ? ? ? public String call() {
? ? ? ? ? ? ? ? return "JoonWhee";
? ? ? ? ? ? }
? ? ? ? }; ? ? ?
? ? ? ? FutureTask<String> futureTo = new FutureTask<String>(task);
? ? ? ? THREAD_POOL.submit(futureTo);
? ? ? ? System.out.println(futureTo.get()); // 獲取返回結果
// ? ? ? ?System.out.println(futureTo.get(3, TimeUnit.SECONDS)); ?// 超時時間為3秒
? ? }
}?
線程池的定義和優點
線程池,從字面含義來看,是指管理一組同構工作線程的資源池。線程池是與工作隊列密切相關的,其中在工作隊列中保存了所有等待執行的任務。工作者線程的任務很簡單:從工作隊列中獲取一個任務,執行任務,然后返回線程池并等待下一個任務。
“在線程池中執行任務“”比“為每個線程分配一個任務”優勢更多。通過重用現有的線程而不是創建線程,可以在處理多個請求時分攤在線程創建和銷毀過程中產生的巨大開銷。另一個額外的好處是,當請求到達時,工作線程通常已經存在,因此不會由于等待創建線程而延遲任務的執行,從而提高了響應性。通過適當的調整線程池的大小,可以創建足夠的線程以便使處理器保持忙碌狀態,同時還可以防止過多線程相互競爭資源而使應用程序耗盡內存或失敗。
線程池的工作流程
默認情況下,創建完線程池后并不會立即創建線程, 而是等到有任務提交時才會創建線程來進行處理。(除非調用prestartCoreThread或prestartAllCoreThreads方法)?
當線程數小于核心線程數時,每提交一個任務就創建一個線程來執行,即使當前有線程處于空閑狀態,直到當前線程數達到核心線程數。??
當前線程數達到核心線程數時,如果這個時候還提交任務,這些任務會被放到隊列里,等到線程處理完了手頭的任務后,會來隊列中取任務處理。??
當前線程數達到核心線程數并且隊列也滿了,如果這個時候還提交任務,則會繼續創建線程來處理,直到線程數達到最大線程數。
當前線程數達到最大線程數并且隊列也滿了,如果這個時候還提交任務,則會觸發飽和策略。?
如果某個線程的控線時間超過了keepAliveTime,那么將被標記為可回收的,并且當前線程池的當前大小超過了核心線程數時,這個線程將被終止。
工作隊列
如果新請求的到達速率超過了線程池的處理速率,那么新到來的請求將累積起來。在線程池中,這些請求會在一個由Executor管理的Runnable隊列中等待,而不會像線程那樣去競爭CPU資源。常見的工作隊列有以下幾種,前三種用的最多。
ArrayBlockingQueue:列表形式的工作隊列,必須要有初始隊列大小,有界隊列,先進先出。
LinkedBlockingQueue:鏈表形式的工作隊列,可以選擇設置初始隊列大小,有界/無界隊列,先進先出。
SynchronousQueue:SynchronousQueue不是一個真正的隊列,而是一種在線程之間移交的機制。要將一個元素放入SynchronousQueue中, 必須有另一個線程正在等待接受這個元素. 如果沒有線程等待,并且線程池的當前大小小于最大值,那么ThreadPoolExecutor將創建 一個線程, 否則根據飽和策略,這個任務將被拒絕。使用直接移交將更高效,因為任務會直接移交 給執行它的線程,而不是被首先放在隊列中, 然后由工作者線程從隊列中提取任務. 只有當線程池是無解的或者可以拒絕任務時,SynchronousQueue才有實際價值.
PriorityBlockingQueue:優先級隊列,有界隊列,根據優先級來安排任務,任務的優先級是通過自然順序或Comparator(如果任務實現了Comparator)來定義的。
DelayedWorkQueue:延遲的工作隊列,無界隊列。
飽和策略(拒絕策略)
當有界隊列被填滿后,飽和策略開始發揮作用。ThreadPoolExecutor的飽和策略可以通過調用setRejectedExecutionHandler來修改。(如果某個任務被提交到一個已被關閉的Executor時,也會用到飽和策略)。飽和策略有以下四種,一般使用默認的AbortPolicy。
AbortPolicy:中止策略。默認的飽和策略,拋出未檢查的RejectedExecutionException。調用者可以捕獲這個異常,然后根據需求編寫自己的處理代碼。
DiscardPolicy:拋棄策略。當新提交的任務無法保存到隊列中等待執行時,該策略會悄悄拋棄該任務。
DiscardOldestPolicy:拋棄最舊的策略。當新提交的任務無法保存到隊列中等待執行時,則會拋棄下一個將被執行的任務,然后嘗試重新提交新的任務。(如果工作隊列是一個優先隊列,那么“拋棄最舊的”策略將導致拋棄優先級最高的任務,因此最好不要將“拋棄最舊的”策略和優先級隊列放在一起使用)。
CallerRunsPolicy:調用者運行策略。該策略實現了一種調節機制,該策略既不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者(調用線程池執行任務的主線程),從而降低新任務的流程。它不會在線程池的某個線程中執行新提交的任務,而是在一個調用了execute的線程中執行該任務。當線程池的所有線程都被占用,并且工作隊列被填滿后,下一個任務會在調用execute時在主線程中執行(調用線程池執行任務的主線程)。由于執行任務需要一定時間,因此主線程至少在一段時間內不能提交任務,從而使得工作者線程有時間來處理完正在執行的任務。在這期間,主線程不會調用accept,因此到達的請求將被保存在TCP層的隊列中。如果持續過載,那么TCP層將最終發現它的請求隊列被填滿,因此同樣會開始拋棄請求。當服務器過載后,這種過載情況會逐漸向外蔓延開來——從線程池到工作隊列到應用程序再到TCP層,最終達到客戶端,導致服務器在高負載下實現一種平緩的性能降低。
線程工廠
每當線程池需要創建一個線程時,都是通過線程工廠方法來完成的。在ThreadFactory中只定義了一個方法newThread,每當線程池需要創建一個新線程時都會調用這個方法。Executors提供的線程工廠有兩種,一般使用默認的,當然如果有特殊需求,也可以自己定制。
DefaultThreadFactory:默認線程工廠,創建一個新的、非守護的線程,并且不包含特殊的配置信息。
PrivilegedThreadFactory:通過這種方式創建出來的線程,將與創建privilegedThreadFactory的線程擁有相同的訪問權限、 AccessControlContext、ContextClassLoader。如果不使用privilegedThreadFactory, 線程池創建的線程將從在需要新線程時調用execute或submit的客戶程序中繼承訪問權限。
自定義線程工廠:可以自己實現ThreadFactory接口來定制自己的線程工廠方法。
ThreadPoolExecutor源碼解析
幾個點
了解這幾個點,有助于你閱讀下面的源碼解釋。
下面的源碼解讀中提到的運行狀態就是runState,有效的線程數就是workerCount,內容比較多,所以可能兩種寫法都用到。
運行狀態的一些定義:RUNNING:接受新任務并處理排隊任務; SHUTDOWN:不接受新任務,但處理排隊任務; STOP:不接受新任務,不處理排隊任務,并中斷正在進行的任務;TIDYING:所有任務已經終止,workerCount為零,線程轉換到狀態TIDYING將運行terminate()鉤子方法;TERMINATED:terminated()已經完成,該方法執行完畢代表線程池已經完全終止。
運行狀態之間并不是隨意轉換的,大多數狀態都只能由固定的狀態轉換而來,轉換關系見第4點~第8點。
RUNNING - > SHUTDOWN:在調用shutdown()時,可能隱含在finalize()。
(RUNNING or SHUTDOWN) -> STOP:調用shutdownNow()。
SHUTDOWN - > TIDYING:當隊列和線程池都是空的時。
STOP - > TIDYING:當線程池為空時。
TIDYING - > TERMINATED:當terminate()方法完成時。
基礎屬性(很重要)
/**
?* 主池控制狀態ctl是包含兩個概念字段的原子整數: workerCount:指有效的線程數量;
?* runState:指運行狀態,運行,關閉等。為了將workerCount和runState用1個int來表示,
?* 我們限制workerCount范圍為(2 ^ 29) - 1,即用int的低29位用來表示workerCount,
?* 用int的高3位用來表示runState,這樣workerCount和runState剛好用int可以完整表示。
?*/
// 初始化時有效的線程數為0, 此時ctl為: 1010 0000 0000 0000 0000 0000 0000 0000?
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));?
// 高3位用來表示運行狀態,此值用于運行狀態向左移動的位數,即29位
private static final int COUNT_BITS = Integer.SIZE - 3; ? ??
// 線程數容量,低29位表示有效的線程數, 0001 1111 1111 1111 1111 1111 1111 1111
private static final int CAPACITY ? = (1 << COUNT_BITS) - 1;
?
/**
?* 大小關系:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED,
?* 源碼中頻繁使用大小關系來作為條件判斷。
?* 1010 0000 0000 0000 0000 0000 0000 0000 運行
?* 0110 0000 0000 0000 0000 0000 0000 0000 關閉
?* 0110 0000 0000 0000 0000 0000 0000 0000 停止
?* 0110 0000 0000 0000 0000 0000 0000 0000 整理
?* 0110 0000 0000 0000 0000 0000 0000 0000 終止
?*/
private static final int RUNNING ? ?= -1 << COUNT_BITS; // 運行
private static final int SHUTDOWN ? = ?0 << COUNT_BITS; // 關閉
private static final int STOP ? ? ? = ?1 << COUNT_BITS; // 停止
private static final int TIDYING ? ?= ?2 << COUNT_BITS; // 整理
private static final int TERMINATED = ?3 << COUNT_BITS; // 終止
?
/**
?* 得到運行狀態:入參c為ctl的值,~CAPACITY高3位為1低29位全為0,?
?* 因此運算結果為ctl的高3位, 也就是運行狀態
?*/
private static int runStateOf(int c) ? ? { return c & ~CAPACITY; } ?
/**
?* 得到有效的線程數:入參c為ctl的值, CAPACITY高3為為0,?
?* 低29位全為1, 因此運算結果為ctl的低29位, 也就是有效的線程數
?*/
private static int workerCountOf(int c) ?{ return c & CAPACITY; } ??
/**
?* 得到ctl的值:高3位的運行狀態和低29位的有效線程數進行或運算,?
?* 組合成一個完成的32位數
?*/
private static int ctlOf(int rs, int wc) { return rs | wc; } ? ?
?
// 狀態c是否小于s
private static boolean runStateLessThan(int c, int s) {?
? ? return c < s;
}
// 狀態c是否大于等于s
private static boolean runStateAtLeast(int c, int s) {
? ? return c >= s;
}
// 狀態c是否為RUNNING(小于SHUTDOWN的狀態只有RUNNING)
private static boolean isRunning(int c) {
? ? return c < SHUTDOWN;
}
?
// 使用CAS增加一個有效的線程
private boolean compareAndIncrementWorkerCount(int expect) { ? ?
? ? return ctl.compareAndSet(expect, expect + 1);
}
?
// 使用CAS減少一個有效的線程
private boolean compareAndDecrementWorkerCount(int expect) { ? ?
? ? return ctl.compareAndSet(expect, expect - 1);
}
?
// 減少一個有效的線程
private void decrementWorkerCount() {
? ? do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
?
// 工作隊列
private final BlockingQueue<Runnable> workQueue; ? ?
?
// 鎖
private final ReentrantLock mainLock = new ReentrantLock();?
?
// 包含線程池中的所有工作線程,只有在mainLock的情況下才能訪問,Worker集合
private final HashSet<Worker> workers = new HashSet<Worker>();
?
private final Condition termination = mainLock.newCondition();
?
// 跟蹤線程池的最大到達大小,僅在mainLock下訪問
private int largestPoolSize;
?
// 總的完成的任務數
private long completedTaskCount;
?
// 線程工廠,用于創建線程
private volatile ThreadFactory threadFactory;
?
// 拒絕策略
private volatile RejectedExecutionHandler handler;
?
?
/**
?* 線程超時時間,當線程數超過corePoolSize時生效,?
?* 如果有線程空閑時間超過keepAliveTime, 則會被終止
?*/
private volatile long keepAliveTime; ? ?
?
// 是否允許核心線程超時,默認false,false情況下核心線程會一直存活。
private volatile boolean allowCoreThreadTimeOut;
?
// 核心線程數
private volatile int corePoolSize;
?
// 最大線程數
private volatile int maximumPoolSize;
?
// 默認飽和策略(拒絕策略), 拋異常
private static final RejectedExecutionHandler defaultHandler =?
? ? new AbortPolicy();
?
private static final RuntimePermission shutdownPerm =
? ? new RuntimePermission("modifyThread");
?
/**
?* Worker類,每個Worker包含一個線程、一個初始任務、一個任務計算器
?*/
private final class Worker ??
? ? extends AbstractQueuedSynchronizer
? ? implements Runnable
{
? ? private static final long serialVersionUID = 6138294804551838833L;
?
? ? final Thread thread; ? ?// Worker對應的線程
? ? Runnable firstTask; // 運行的初始任務。
? ? volatile long completedTasks; ? // 每個線程的任務計數器
?
? ? Worker(Runnable firstTask) {
? ? ? ? setState(-1); // 禁止中斷,直到runWorker
? ? ? ? this.firstTask = firstTask; // 設置為初始任務
? ? ? ? // 使用當前線程池的線程工廠創建一個線程
? ? ? ? this.thread = getThreadFactory().newThread(this); ?
? ? }
?
? ? // 將主運行循環委托給外部runWorker
? ? public void run() {
? ? ? ? runWorker(this);
? ? }
?
? ? // Lock methods
? ? //
? ? // The value 0 represents the unlocked state.
? ? // The value 1 represents the locked state.
? ? /**
? ? ?* 通過AQS的同步狀態來實現鎖機制。state為0時代表鎖未被獲取(解鎖狀態),
? ? ?* state為1時代表鎖已經被獲取(加鎖狀態)。
? ? ?*/
? ? protected boolean isHeldExclusively() { //?
? ? ? ? return getState() != 0;?
? ? }
? ? protected boolean tryAcquire(int unused) { ?// 嘗試獲取鎖
? ? ? ? if (compareAndSetState(0, 1)) { // 使用CAS嘗試將state設置為1,即嘗試獲取鎖
? ? ? ? ? ? // 成功將state設置為1,則當前線程擁有獨占訪問權
? ? ? ? ? ? setExclusiveOwnerThread(Thread.currentThread()); ? ?
? ? ? ? ? ? return true;
? ? ? ? }
? ? ? ? return false;
? ? }
? ? protected boolean tryRelease(int unused) { ?// 嘗試釋放鎖
? ? ? ? setExclusiveOwnerThread(null); ?// 釋放獨占訪問權:即將獨占訪問線程設為null
? ? ? ? setState(0); ? ?// 解鎖:將state設置為0
? ? ? ? return true;
? ? }
? ? public void lock() ? ? ? ?{ acquire(1); } ? // 加鎖
? ? public boolean tryLock() ?{ return tryAcquire(1); } // 嘗試加鎖
? ? public void unlock() ? ? ?{ release(1); } ? // 解鎖
? ? public boolean isLocked() { return isHeldExclusively(); } ?// 是否為加鎖狀態?
? ? void interruptIfStarted() { // 如果線程啟動了,則進行中斷
? ? ? ? Thread t;
? ? ? ? if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? t.interrupt();
? ? ? ? ? ? } catch (SecurityException ignore) {
? ? ? ? ? ? }
? ? ? ? }
? ? }
}
execute方法
使用線程池的submit方法提交任務時,會走到該方法,該方法也是線程池最重要的方法。
public void execute(Runnable command) {
? ? if (command == null) ? ?// 為空校驗
? ? ? ? throw new NullPointerException();
?
? ? int c = ctl.get(); ?// 拿到當前的ctl值
? ? if (workerCountOf(c) < corePoolSize) { ?// 如果有效的線程數小于核心線程數
? ? ? ? if (addWorker(command, true)) ? // 則新建一個線程來處理任務(核心線程)
? ? ? ? ? ? return;
? ? ? ? c = ctl.get(); ?// 拿到當前的ctl值
? ? }
? ? // 走到這里說明有效的線程數已經 >= 核心線程數
? ? if (isRunning(c) && workQueue.offer(command)) {// 如果當前狀態是運行, 嘗試將任務放入工作隊列
? ? ? ? int recheck = ctl.get(); ? ?// 再次拿到當前的ctl值
? ? ? ? // 如果再次檢查狀態不是運行, 則將剛才添加到工作隊列的任務移除
? ? ? ? if (! isRunning(recheck) && remove(command))?
? ? ? ? ? ? reject(command); ? ?// 并調用拒絕策略
? ? ? ? else if (workerCountOf(recheck) == 0) // 如果再次檢查時,有效的線程數為0,?
? ? ? ? ? ? addWorker(null, false); // 則新建一個線程(非核心線程)
? ? }
? ? // 走到這里說明工作隊列已滿
? ? else if (!addWorker(command, false))//嘗試新建一個線程來處理任務(非核心)
? ? ? ? reject(command); ? ?// 如果失敗則調用拒絕策略
}
該方法就是對應上文的線程池的工作流程。主要調用到的方法為addWorker(見下文addWorker方法解讀)。
addWorker方法
/**
?* 添加一個Worker,Worker包含一個線程和一個任務,由這個線程來執行該任務。
?*/
private boolean addWorker(Runnable firstTask, boolean core) { ??
? ? retry:
? ? for (;;) {
? ? ? ? int c = ctl.get(); ?// c賦值為ctl
? ? ? ? int rs = runStateOf(c); // rs賦值為運行狀態
? ? ? ? /**
? ? ? ? ?* 1.如果池停止或有資格關閉,則此方法返回false;
? ? ? ? ?* 如果線程工廠在被詢問時未能創建線程,它也返回false。?
? ? ? ? ?* 包括以下5種情況:
? ? ? ? ?* 1).rs為RUNNING,通過校驗。
? ? ? ? ?* 2).rs為STOP或TIDYING或TERMINATED,返回false。
? ? ? ? ?* (STOP、TIDYING、TERMINATED:已經停止進入最后清理終止,不接受任務不處理隊列任務)
? ? ? ? ?* 3).rs為SHUTDOWN,提交的任務不為空,返回false。
? ? ? ? ?* (SHUTDOWN:不接受任務但是處理隊列任務,因此任務不為空返回false)
? ? ? ? ?* 4).rs為SHUTDOWN,提交的任務為空,并且工作隊列為空,返回false。
? ? ? ? ?* (狀態為SHUTDOWN、提交的任務為空、工作隊列為空,則線程池有資格關閉,直接返回false)
? ? ? ? ?* 5).rs為SHUTDOWN,提交的任務為空,并且工作隊列不為空,通過校驗。
? ? ? ? ?* (因為SHUTDOWN狀態下剛好可以處理隊列任務)
? ? ? ? ?*/
? ? ? ? if (rs >= SHUTDOWN &&
? ? ? ? ? ? ! (rs == SHUTDOWN &&
? ? ? ? ? ? ? ?firstTask == null &&
? ? ? ? ? ? ? ?! workQueue.isEmpty()))
? ? ? ? ? ? return false;
?
? ? ? ? for (;;) {
? ? ? ? ? ? int wc = workerCountOf(c); ?// 拿到有效的線程數
? ? ? ? ? ? // 校驗有效的線程數是否超過閾值
? ? ? ? ? ? if (wc >= CAPACITY ||
? ? ? ? ? ? ? ? wc >= (core ? corePoolSize : maximumPoolSize))
? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? // 使用CAS將workerCount+1, 修改成功則跳出循環,否則進入下面的狀態判斷
? ? ? ? ? ? if (compareAndIncrementWorkerCount(c))
? ? ? ? ? ? ? ? break retry;
? ? ? ? ? ? c = ctl.get(); ?// 重新讀取ctl
? ? ? ? ? ? // 判斷當前運行狀態,如果不等于上面獲取的運行狀態rs,
? ? ? ? ? ? // 說明rs被其他線程修改了,跳到retry重新校驗線程池狀態
? ? ? ? ? ? if (runStateOf(c) != rs)
? ? ? ? ? ? ? ? continue retry;
? ? ? ? ? ? // 走到這里說明compareAndIncrementWorkerCount失敗;?
? ? ? ? ? ? // 重試內部循環(狀態沒變,則繼續內部循環,嘗試使用CAS修改workerCount)
? ? ? ? }
? ? }
?
? ? boolean workerStarted = false; ?// Worker的線程是否啟動
? ? boolean workerAdded = false; ? ?// Worker是否成功增加
? ? Worker w = null;
? ? try {
? ? ? ? w = new Worker(firstTask); ?// 用firstTask和當前線程創建一個Worker
? ? ? ? final Thread t = w.thread; ?// 拿到Worker對應的線程
? ? ? ? if (t != null) {
? ? ? ? ? ? final ReentrantLock mainLock = this.mainLock;
? ? ? ? ? ? mainLock.lock(); ? ?// 加鎖
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? // Recheck while holding lock.
? ? ? ? ? ? ? ? // Back out on ThreadFactory failure or if
? ? ? ? ? ? ? ? // shut down before lock acquired.
? ? ? ? ? ? ? ? int rs = runStateOf(ctl.get()); // 加鎖的情況下重新獲取當前的運行狀態
?
? ? ? ? ? ? ? ? // 如果當前的運行狀態為RUNNING,
? ? ? ? ? ? ? ? // 或者當前的運行狀態為SHUTDOWN并且firstTask為空,則通過校驗
? ? ? ? ? ? ? ? if (rs < SHUTDOWN ||
? ? ? ? ? ? ? ? ? ? (rs == SHUTDOWN && firstTask == null)) {
? ? ? ? ? ? ? ? ? ? if (t.isAlive()) ? ?// 預先校驗線程是可以啟動的
? ? ? ? ? ? ? ? ? ? ? ? throw new IllegalThreadStateException();
? ? ? ? ? ? ? ? ? ? workers.add(w); // 將剛創建的worker添加到工作者列表
? ? ? ? ? ? ? ? ? ? int s = workers.size();
? ? ? ? ? ? ? ? ? ? if (s > largestPoolSize)
? ? ? ? ? ? ? ? ? ? ? ? largestPoolSize = s;
? ? ? ? ? ? ? ? ? ? workerAdded = true;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? mainLock.unlock();
? ? ? ? ? ? }
? ? ? ? ? ? if (workerAdded) { ?// 如果Worker添加成功,則啟動線程執行
? ? ? ? ? ? ? ? t.start();
? ? ? ? ? ? ? ? workerStarted = true;
? ? ? ? ? ? }
? ? ? ? }
? ? } finally {
? ? ? ? if (! workerStarted) ? ?// 如果Worker的線程沒有成功啟動
? ? ? ? ? ? addWorkerFailed(w); // 則進行回滾, 移除之前添加的Worker
? ? }
? ? return workerStarted;
}
該方法主要目的就是使用入參中的firstTask和當前線程添加一個Worker,前面的for循環主要是對當前線程池的運行狀態和有效的線程數進行一些校驗,校驗邏輯比較繞,可以參考注釋進行理解。該方法涉及到的其他方法有addWorkerFailed(見下文addWorkerFailed源碼解讀);還有就是Worker的線程啟動時,會調用Worker里的run方法,執行runWorker(this)方法(見下文runWorker源碼解讀)。
addWorkerFailed方法
/**
?* Rolls back the worker thread creation.
?* - removes worker from workers, if present
?* - decrements worker count
?* - rechecks for termination, in case the existence of this
?* ? worker was holding up termination
?*/
private void addWorkerFailed(Worker w) { ? ?// 回滾Worker的添加,就是將Worker移除
? ? final ReentrantLock mainLock = this.mainLock;
? ? mainLock.lock();
? ? try {
? ? ? ? if (w != null)
? ? ? ? ? ? workers.remove(w); ?// 移除Worker
? ? ? ? decrementWorkerCount(); // 有效線程數-1
? ? ? ? tryTerminate(); // 有worker線程移除,可能是最后一個線程退出需要嘗試終止線程池
? ? } finally {
? ? ? ? mainLock.unlock();
? ? }
}
該方法很簡單,就是移除入參中的Worker并將workerCount-1,最后調用tryTerminate嘗試終止線程池,tryTerminate見下文對應方法源碼解讀。
runWorker方法
上文addWork方法里說道,當Worker里的線程啟動時,就會調用該方法。
/**
?* Worker的線程開始執行任務
?*/
final void runWorker(Worker w) {
? ? Thread wt = Thread.currentThread(); // 獲取當前線程
? ? Runnable task = w.firstTask; ? ?// 拿到Worker的初始任務
? ? w.firstTask = null;
? ? w.unlock(); // allow interrupts
? ? boolean completedAbruptly = true; ? // Worker是不是因異常而死亡
? ? try {
? ? ? ? while (task != null || (task = getTask()) != null) {// Worker取任務執行
? ? ? ? ? ? w.lock(); ? // 加鎖
? ? ? ? ? ? /**如果線程池停止,確保線程中斷; 如果不是,確保線程不被中斷。
? ? ? ? ? ? ?* 在第二種情況下進行重新檢查,以便在清除中斷的同時處理shutdownNow競爭
? ? ? ? ? ? ?* 線程池停止指運行狀態為STOP/TIDYING/TERMINATED中的一種
? ? ? ? ? ? ?*/
? ? ? ? ? ? if ((runStateAtLeast(ctl.get(), STOP) || ? ?// 判斷線程池運行狀態
? ? ? ? ? ? ? ? ?(Thread.interrupted() && ? // 重新檢查
? ? ? ? ? ? ? ? ? runStateAtLeast(ctl.get(), STOP))) && // 再次判斷線程池運行狀態
? ? ? ? ? ? ? ? !wt.isInterrupted())// 走到這里代表線程池運行狀態為停止,檢查wt是否中斷
? ? ? ? ? ? ? ? wt.interrupt(); // 線程池的狀態為停止并且wt不為中斷, 則將wt中斷
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? beforeExecute(wt, task);// 執行beforeExecute(默認空,需要自己重寫)
? ? ? ? ? ? ? ? Throwable thrown = null;
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? task.run(); // 執行任務
? ? ? ? ? ? ? ? } catch (RuntimeException x) {
? ? ? ? ? ? ? ? ? ? thrown = x; throw x; //如果拋異常,則completedAbruptly為true
? ? ? ? ? ? ? ? } catch (Error x) {
? ? ? ? ? ? ? ? ? ? thrown = x; throw x;
? ? ? ? ? ? ? ? } catch (Throwable x) {
? ? ? ? ? ? ? ? ? ? thrown = x; throw new Error(x);
? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? afterExecute(task, thrown);// 執行afterExecute(需要自己重寫)
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? task = null; ? ?// 將執行完的任務清空
? ? ? ? ? ? ? ? w.completedTasks++; // Worker完成任務數+1
? ? ? ? ? ? ? ? w.unlock();
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? completedAbruptly = false; ?// 如果執行到這里,則worker是正常退出
? ? } finally {
? ? ? ? processWorkerExit(w, completedAbruptly);// 調用processWorkerExit方法
? ? }
}
該方法為Worker線程開始執行任務,首先執行當初創建Worker時的初始任務,接著從工作隊列中獲取任務執行。主要涉及兩個方法:獲取任務的方法getTask(見下文getTask源碼解讀)和執行Worker退出的方法processWorkerExit(見下文processWorkerExit源碼解讀)。注:processWorkerExit在處理正常Worker退出時,沒有對workerCount-1,而是在getTask方法中進行workerCount-1。
getTask方法
private Runnable getTask() { ? ?// Worker從工作隊列獲取任務
? ? boolean timedOut = false; // poll方法取任務是否超時
?
? ? for (;;) { ?// 無線循環
? ? ? ? int c = ctl.get(); ?// ctl
? ? ? ? int rs = runStateOf(c); // 當前運行狀態
?
? ? ? ? // 如果線程池運行狀態為停止,或者可以停止(狀態為SHUTDOWN并且隊列為空)
? ? ? ? // 則返回null,代表當前Worker需要移除
? ? ? ? if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { ? ?
? ? ? ? ? ? decrementWorkerCount(); // 將workerCount - 1
? ? ? ? ? ? // 返回null前將workerCount - 1,
? ? ? ? ? ? // 因此processWorkerExit中completedAbruptly=false時無需再減
? ? ? ? ? ? return null;
? ? ? ? }
?
? ? ? ? int wc = workerCountOf(c); ?// 當前的workerCount
?
? ? ? ? // 判斷當前Worker是否可以被移除, 即當前Worker是否可以一直等待任務。
? ? ? ? // 如果allowCoreThreadTimeOut為true,或者workerCount大于核心線程數,
? ? ? ? // 則當前線程是有超時時間的(keepAliveTime),無法一直等待任務。
? ? ? ? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; ? ?
?
? ? ? ? // 如果wc超過最大線程數 或者 當前線程會超時并且已經超時,
? ? ? ? // 并且wc > 1 或者 工作隊列為空,則返回null,代表當前Worker需要移除
? ? ? ? if ((wc > maximumPoolSize || (timed && timedOut))
? ? ? ? ? ? && (wc > 1 || workQueue.isEmpty())) { ? // 確保有Worker可以移除?
? ? ? ? ? ? if (compareAndDecrementWorkerCount(c))
? ? ? ? ? ? ? ? // 返回null前將workerCount - 1,
? ? ? ? ? ? ? ? // 因此processWorkerExit中completedAbruptly=false時無需再減
? ? ? ? ? ? ? ? return null; ? ?
? ? ? ? ? ? continue;
? ? ? ? }
?
? ? ? ? try {
? ? ? ? ? ? // 根據線程是否會超時調用相應的方法,poll為帶超時的獲取任務方法
? ? ? ? ? ? // take()為不帶超時的獲取任務方法,會一直阻塞直到獲取到任務
? ? ? ? ? ? Runnable r = timed ??
? ? ? ? ? ? ? ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
? ? ? ? ? ? ? ? workQueue.take();
? ? ? ? ? ? if (r != null)
? ? ? ? ? ? ? ? return r;
? ? ? ? ? ? timedOut = true; ? ?// 走到這代表當前線程獲取任務超時
? ? ? ? } catch (InterruptedException retry) {
? ? ? ? ? ? timedOut = false; ? // 被中斷
? ? ? ? }
? ? }
}
Worker從工作隊列獲取任務,如果allowCoreThreadTimeOut為false并且??workerCount<=corePoolSize,則這些核心線程永遠存活,并且一直在嘗試獲取工作隊列的任務;否則,線程會有超時時間(keepAliveTime),當在keepAliveTime時間內獲取不到任務,該線程的Worker會被移除。?
Worker移除的過程:getTask方法返回null,導致runWorker方法中跳出while循環,調用processWorkerExit方法將Worker移除。注意:在返回null的之前,已經將workerCount-1,因此在processWorkerExit中,completedAbruptly=false的情況(即正常超時退出)不需要再將workerCount-1。
processWorkerExit方法
private void processWorkerExit(Worker w, boolean completedAbruptly) { ? // Worker的退出
? ? // 如果Worker是異常死亡(completedAbruptly=true),則workerCount-1;
? ? // 如果completedAbruptly為false的時候(正常超時退出),則代表task=getTask()等于null,
? ? // getTask()方法中返回null的地方,都已經將workerCount - 1,所以此處無需再-1
? ? if (completedAbruptly)?
? ? ? ? decrementWorkerCount();
?
? ? final ReentrantLock mainLock = this.mainLock;
? ? mainLock.lock(); ? ?// 加鎖
? ? try {
? ? ? ? completedTaskCount += w.completedTasks; // 該Worker完成的任務數加到總完成的任務數
? ? ? ? workers.remove(w); ?// 移除該Worker
? ? } finally {
? ? ? ? mainLock.unlock();
? ? }
?
? ? tryTerminate(); // 有Worker線程移除,可能是最后一個線程退出,需要嘗試終止線程池
?
? ? int c = ctl.get(); ?// 獲取當前的ctl
? ? if (runStateLessThan(c, STOP)) { ?// 如果線程池的運行狀態還沒停止(RUNNING或SHUTDOWN)
? ? ? ? if (!completedAbruptly) { ? // 如果Worker不是異常死亡
? ? ? ? ? ? // min為線程池的理論最小線程數:如果允許核心線程超時則min為0,否則min為核心線程數
? ? ? ? ? ? int min = allowCoreThreadTimeOut ? 0 : corePoolSize; ? ?
? ? ? ? ? ? // 如果min為0,工作隊列不為空,將min設置為1,確保至少有1個Worker來處理隊列里的任務?
? ? ? ? ? ? if (min == 0 && ! workQueue.isEmpty())
? ? ? ? ? ? ? ? min = 1;
? ? ? ? ? ? // 當前有效的線程數>=min,直接返回;
? ? ? ? ? ? if (workerCountOf(c) >= min)
? ? ? ? ? ? ? ? return; // replacement not needed?
? ? ? ? ? ? // 如果代碼走到這邊,代表workerCountOf(c) < min,此時會走到下面的addWorker方法。
? ? ? ? ? ? // 通過getTask方法我們知道,當allowCoreThreadTimeOut為false
? ? ? ? ? ? // 并且workerCount<=corePoolSize時,是不會走到processWorkerExit方法的。
? ? ? ? ? ? // 因此走到這邊只可能是當前移除的Worker是最后一個Worker,但是此時工作
? ? ? ? ? ? // 隊列還不為空,因此min被設置成了1,所以需要在添加一個Worker來處理工作隊列。
? ? ? ? }
? ? ? ? addWorker(null, false); // 添加一個Worker
? ? }
}
該方法就是執行Worker的退出:統計完成的任務數,將Worker移除,并嘗試終止線程池,最后根據情況決定是否創建一個新的Worker。兩種情況下會創建一個新的Worker:1)被移除的Worker是由于異常而死亡;2)被移除的Worker是最后一個Worker,但是工作隊列還有任務。completedAbruptly=false時,沒有將workerCount-1是因為已經在getTask方法中將workerCount-1。
tryTerminate方法
final void tryTerminate() { // 嘗試終止線程池
? ? for (;;) {
? ? ? ? int c = ctl.get();
? ? ? ? // 只有當前狀態為STOP 或者 SHUTDOWN并且隊列為空,才會嘗試整理并終止
? ? ? ? // 1: 當前狀態為RUNNING,則不嘗試終止,直接返回
? ? ? ? // 2: 當前狀態為TIDYING或TERMINATED,代表有其他線程正在執行終止,直接返回
? ? ? ? // 3: 當前狀態為SHUTDOWN 并且 workQueue不為空,則不嘗試終止,直接返回
? ? ? ? if (isRunning(c) || // 1
? ? ? ? ? ? runStateAtLeast(c, TIDYING) || ?// 2
? ? ? ? ? ? (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) ? // 3
? ? ? ? ? ? return;
? ? ? ? // 走到這代表線程池可以終止(通過上面的校驗)
? ? ? ? // 如果此時有效線程數不為0, 將中斷一個空閑的Worker,以確保關閉信號傳播
? ? ? ? if (workerCountOf(c) != 0) { // Eligible to terminate?
? ? ? ? ? ? interruptIdleWorkers(ONLY_ONE);
? ? ? ? ? ? return;
? ? ? ? }
?
? ? ? ? final ReentrantLock mainLock = this.mainLock;
? ? ? ? mainLock.lock(); ? ?// 加鎖,終止線程池
? ? ? ? try {
? ? ? ? ? ? // 使用CAS將ctl的運行狀態設置為TIDYING,有效線程數設置為0
? ? ? ? ? ? if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { ?
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? terminated(); ? // 供用戶重寫的terminated方法,默認為空
? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? // 將ctl的運行狀態設置為TERMINATED,有效線程數設置為0
? ? ? ? ? ? ? ? ? ? ctl.set(ctlOf(TERMINATED, 0)); ?
? ? ? ? ? ? ? ? ? ? termination.signalAll();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? }
? ? ? ? } finally {
? ? ? ? ? ? mainLock.unlock();
? ? ? ? }
? ? ? ? // else retry on failed CAS
? ? }
}
該方法用來嘗試終止線程池,主要在移除Worker后會調用此方法。首先進行一些狀態的校驗,如果通過校驗,則在加鎖的條件下,使用CAS將運行狀態設為TERMINATED,有效線程數設為0。
參考:
ThreadPoolExecutor源碼(JDK 1.8)
---------------------?
作者:程序員囧輝?
來源:CSDN?
原文:https://blog.csdn.net/v123411739/article/details/79124193?
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!
總結
以上是生活随笔為你收集整理的Java并发:线程池详解(ThreadPoolExecutor)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 线程池,这一篇或许就够了
- 下一篇: 线程池的使用(线程池重点解析)