[并发编程] - Executor框架#ThreadPoolExecutor源码解读03
文章目錄
- Pre
- execute源碼分析
- addWorker()解讀
- Worker解讀
 
Pre
[并發(fā)編程] - Executor框架#ThreadPoolExecutor源碼解讀02
 說了一堆結(jié)論性的東西,作為開發(fā)人員著實是不過癮,那這里我們就來剖根問底來看下線程池是如何工作的。
execute源碼分析
ThreadPoolExecutor te = new ThreadPoolExecutor(5,10,500,TimeUnit.SECONDS,new ArrayBlockingQueue(5));for (int i = 0; i < 6; i++) {te.submit(()->{System.out.println("i m task :"+Thread.currentThread().getName());});}使用ThreadPoolExecutor 自定義了一個線程池
參數(shù)對應(yīng)如下
int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue調(diào)用了 AbstractExecutorService#submit
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}最核心的方法 execute ,由子類ThredPoolExecutor實現(xiàn)
 
主要的流程,注釋中也寫的很清楚了
/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/簡單來說,在執(zhí)行execute()方法時如果狀態(tài)一直是RUNNING時,的執(zhí)行過程如下:
Note : 這里要注意一下addWorker(null, false);,也就是創(chuàng)建一個線程,但并沒有傳入任務(wù),因為任務(wù)已經(jīng)被添加到workQueue中了,所以worker在執(zhí)行的時候,會直接從workQueue中獲取任務(wù)。所以,在workerCountOf(recheck) == 0時執(zhí)行addWorker(null, false);也是為了保證線程池在RUNNING狀態(tài)下必須要有一個線程來執(zhí)行任務(wù)。
addWorker()解讀
private boolean addWorker(Runnable firstTask, boolean core) {}addWorker方法的主要工作是在線程池中創(chuàng)建一個新的線程并執(zhí)行,
- firstTask參數(shù) 用于指定新增的線程執(zhí)行的第一個任務(wù),
- core參數(shù)為true表示在新增線程時會判斷當(dāng)前活動線程數(shù)是否少于corePoolSize,false表示新增線程前需要判斷當(dāng)前活動線程數(shù)是否少于maximumPoolSize
Worker解讀
addWorker中多次提到了這個Work這個類, 其實就是 線程池中的每一個線程被封裝成一個Worker對象,ThreadPool維護(hù)的其實就是一組Worker對象
ThreadPoolExector中內(nèi)部類 Worker
 
繼承了AQS,并實現(xiàn)了Runnable接口 ,
 兩個比較重要的屬性
在調(diào)用構(gòu)造方法時,需要把任務(wù)傳入,這里通過getThreadFactory().newThread(this);來新建一個線程,newThread方法傳入的參數(shù)是this,因為Worker本身繼承了Runnable接口,也就是一個線程,所以一個Worker對象在啟動的時候會調(diào)用Worker類中的run方法。
Worker繼承了AQS,使用AQS來實現(xiàn)獨占鎖的功能。為什么不使用ReentrantLock來實現(xiàn)呢?可以看到tryAcquire方法,它是不允許重入的,而ReentrantLock是允許重入的
總結(jié)
以上是生活随笔為你收集整理的[并发编程] - Executor框架#ThreadPoolExecutor源码解读03的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: Spring5源码 - 构建源码环境
- 下一篇: Spring5源码 - 01 BeanD
