线程池的使用(线程池重点解析)
我們有兩種常見的創建線程的方法,一種是繼承Thread類,一種是實現Runnable的接口,Thread類其實也是實現了Runnable接口。但是我們創建這兩種線程在運行結束后都會被虛擬機銷毀,如果線程數量多的話,頻繁的創建和銷毀線程會大大浪費時間和效率,更重要的是浪費內存,因為正常來說線程執行完畢后死亡,線程對象變成垃圾!那么有沒有一種方法能讓線程運行完后不立即銷毀,而是讓線程重復使用,繼續執行其他的任務哪?我們使用線程池就能很好地解決這個問題。
? ? ? ?我們接下來重點說明線程池類家族有哪些,線程池創建線程完成任務的實現原理是什么以及線程池的一些特性來進行分析。此文參考了https://www.cnblogs.com/dolphin0520/p/3932921.html
一.線程池家族
? ? ?線程池的最上層接口是Executor,這個接口定義了一個核心方法execute(Runnabel command),這個方法最后被ThreadPoolExecutor類實現,這個方法是用來傳入任務的。而且ThreadPoolExecutor是線程池的核心類,此類的構造方法如下:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);構造方法的參數及意義:
corePoolSize:核心線程池的大小,如果核心線程池有空閑位置,這是新的任務就會被核心線程池新建一個線程執行,執行完畢后不會銷毀線程,線程會進入緩存隊列等待再次被運行。
maximunPoolSize:線程池能創建最大的線程數量。如果核心線程池和緩存隊列都已經滿了,新的任務進來就會創建新的線程來執行。但是數量不能超過maximunPoolSize,否側會采取拒絕接受任務策略,我們下面會具體分析。
keepAliveTime:非核心線程能夠空閑的最長時間,超過時間,線程終止。這個參數默認只有在線程數量超過核心線程池大小時才會起作用。只要線程數量不超過核心線程大小,就不會起作用。
unit:時間單位,和keepAliveTime配合使用。
workQueue:緩存隊列,用來存放等待被執行的任務。
threadFactory:線程工廠,用來創建線程,一般有三種選擇策略。
ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue;handler:拒絕處理策略,線程數量大于最大線程數就會采用拒絕處理策略,四種策略為
ThreadPoolExecutor.AbortPolicy:丟棄任務并拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程) ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務?
Executor接口有一個子接口ExecutorService,ExecutorService的實現類為AbstracExecutorService,而ThreadPoolExcutor正是AbstrcExecutorService的子類。
ThreadPoolExecutor還有兩個常用的方法shutdown和submit,兩者都用來關閉線程池,但是后者有一個結果返回。
?
?
二.線程池實現原理
線程池圖:
1.線程池狀態
線程池和線程一樣擁有自己的狀態,在ThreadPoolExecutor類中定義了一個volatile變量runState來表示線程池的狀態,線程池有四種狀態,分別為RUNNING、SHURDOWN、STOP、TERMINATED。
線程池創建后處于RUNNING狀態。
調用shutdown后處于SHUTDOWN狀態,線程池不能接受新的任務,會等待緩沖隊列的任務完成。
調用shutdownNow后處于STOP狀態,線程池不能接受新的任務,并嘗試終止正在執行的任務。
當線程池處于SHUTDOWN或STOP狀態,并且所有工作線程已經銷毀,任務緩存隊列已經清空或執行結束后,線程池被設置為TERMINATED狀態。
2.線程池任務的執行
當執行execute(Runnable command)方法后,傳入了一個任務,我們看一下execute方法的實現原理。
public void execute(Runnable command) {if (command == null)throw new NullPointerException();if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {if (runState == RUNNING && workQueue.offer(command)) {if (runState != RUNNING || poolSize == 0)ensureQueuedTaskHandled(command);}else if (!addIfUnderMaximumPoolSize(command))reject(command); } }整個方法的執行過程是這樣的,首先判斷任務是否為空,空拋空指針異常,否則執行下一個判斷,如果目前線程的數量小于核心線程池大小,就執行addIfUnderCorePollSize(command)方法,在核心線程池創建新的線程,并且執行這個任務。
我們看這個方法的具體實現:
private boolean addIfUnderCorePoolSize(Runnable firstTask) {Thread t = null;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (poolSize < corePoolSize && runState == RUNNING)t = addThread(firstTask); //創建線程去執行firstTask任務 } finally {mainLock.unlock();}if (t == null)return false;t.start();return true; }這個方法首先是獲取線程池的鎖,參考別人的博客,說是和線程池狀態有關,沒有搞懂......,后面又進行一次判斷,判斷線程池線程數量和核心線程池的比較,前面execute()已經判斷過,這里為什么還要進行判斷哪?因為我們執行完Execute()中的判斷后,可能有新的任務進來了,并且為這個任務在核心線程池創建了新的線程去執行,如果剛好這是核心線程池滿了,那么就不能再加入新的縣城到核心線程池了。這種可能性是存在的,因為你不知道cpu什么時間分配給誰,所以我們要加一個判斷,至于線程池狀態為什么也要判斷,也是因為可能有其他線程執行了shutdown或者shutdownNow方法,導致線程池狀態不是RUNNING,那么線程池就停止接收新的任務,也就不會創建新的線程去執行這個任務了。
t=addThread(firstTask);這句代碼至關重要,我們看方法的實現代碼:
private Thread addThread(Runnable firstTask) {Worker w = new Worker(firstTask);Thread t = threadFactory.newThread(w); //創建一個線程,執行任務 if (t != null) {w.thread = t; //將創建的線程的引用賦值為w的成員變量 workers.add(w); //將當前任務添加到任務集int nt = ++poolSize; //當前線程數加1 if (nt > largestPoolSize)largestPoolSize = nt;}return t; }這個方法返回類型是Thread,所以我們可以新建一個線程并執行任務,之后將線程對象返回給外面的線程對象,然后執行t.start(),我們看到有一個Worker對象接收了任務,我們看Worker類的實現:
private final class Worker implements Runnable {private final ReentrantLock runLock = new ReentrantLock();private Runnable firstTask;volatile long completedTasks;Thread thread;Worker(Runnable firstTask) {this.firstTask = firstTask;}boolean isActive() {return runLock.isLocked();}void interruptIfIdle() {final ReentrantLock runLock = this.runLock;if (runLock.tryLock()) {try {if (thread != Thread.currentThread())thread.interrupt();} finally {runLock.unlock();}}}void interruptNow() {thread.interrupt();}private void runTask(Runnable task) {final ReentrantLock runLock = this.runLock;runLock.lock();try {if (runState < STOP &&Thread.interrupted() &&runState >= STOP)boolean ran = false;beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現,用戶可以根據//自己需要重載這個方法和后面的afterExecute方法來進行一些統計信息,比如某個任務的執行時間等 try {task.run();ran = true;afterExecute(task, null);++completedTasks;} catch (RuntimeException ex) {if (!ran)afterExecute(task, ex);throw ex;}} finally {runLock.unlock();}}public void run() {try {Runnable task = firstTask;firstTask = null;while (task != null || (task = getTask()) != null) {runTask(task);task = null;}} finally {workerDone(this); //當任務隊列中沒有任務時,進行清理工作 }} }這個類實現了Runnable接口,所以會有run()方法,我們看到run中執行的還是傳入的任務,其實相當于調用傳入任務對象的run方法,我們之所以費力氣將任務對象加到Worker類中去執行,是因為這個線程執行之后會進入阻塞隊列等待被執行,這個線程的生命并沒有結束,這也正是我們使用線程池的最大原因。我們用一個Set集合存儲Worker,這樣不會有重復的任務被存儲,firstTask被執行完后進入緩存隊列,而這個新創建的線程就一直從緩存隊列中拿到任務去執行。這個方法為getTask(),所以我們來看看線程如何從緩存隊列拿到任務。
Runnable getTask() {for (;;) {try {int state = runState;if (state > SHUTDOWN)return null;Runnable r;if (state == SHUTDOWN) // Help drain queuer = workQueue.poll();else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果線程數大于核心池大小或者允許為核心池線程設置空閑時間,//則通過poll取任務,若等待一定的時間取不到任務,則返回nullr = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);elser = workQueue.take();if (r != null)return r;if (workerCanExit()) { //如果沒取到任務,即r為null,則判斷當前的worker是否可以退出if (runState >= SHUTDOWN) // Wake up othersinterruptIdleWorkers(); //中斷處于空閑狀態的workerreturn null;}// Else retry} catch (InterruptedException ie) {// On interruption, re-check runState}} }我們看到如果核心線程池中創建的線程想要拿到緩存隊列中的任務,先要判斷線程池的狀態,如果STOP或者TERMINATED,返回NULL,如果是RUNNING或者SHUTDOWN,則從緩存隊列中拿到任務去執行。
這就是核心線程池執行任務的原理。
那么如果線程數量超過核心線程池大小哪?我們回到executor()方法,如果發生這種情況,處理方式是?
if (runState == RUNNING && workQueue.offer(command))這段代碼意思是,如果線程數量超過核心線程池大小,先進行線程池狀態的判斷,如果是RUNNING,則將新進來的線程加入緩存隊列。如果失敗,往往是因為緩存隊列滿了或者線程池狀態不是RUNNING,就直接創建新的線程去執行任務,調用addIfUnderMaximumPoolSize(command),就會新創建線程,但是這個縣城不是核心線程池中的,是臨時擴展的,要保證線程數最大不超過線程池大小?maximumPoolSize,如果超過執行?reject(command);操作,拒絕接受新的任務。
還有如果任務已經加入緩存隊列成功還要繼續進行判斷
if (runState != RUNNING || poolSize == 0)這是為了防止在將任務加入緩存隊列的同時其他線程調用shutdown或者shutdownNow方法,所以采取了保護措施
ensureQueuedTaskHandled(command)?
我們看addIfUnderMaximumPoolSize的實現方法:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {Thread t = null;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (poolSize < maximumPoolSize && runState == RUNNING)t = addThread(firstTask);} finally {mainLock.unlock();}if (t == null)return false;t.start();return true; }這個方法和addIfUnderCorePoolSize基本一樣,只是方法中判斷條件改變了,這個方法是在緩沖隊列滿了并且線程池狀態是在RUNNING狀態下才會執行,里面的判斷條件是線程池數量小于線程池最大容量,并且線程池狀態是RUNNING。
?
我們進行總結:
- 如果當前線程池中的線程數目小于corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
- 如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
- 如果當前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕策略進行處理;
- 如果線程池中的線程數量大于 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大于corePoolSize;如果允許為核心池中的線程設置存活時間,那么核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。
三.線程池使用示例
? ? ??
package cn.yqg.java;public class Task implements Runnable{private int num;public Task(int num) {this.num=num;}@Overridepublic void run() {System.out.println("正在執行任務 "+num);try {Thread.currentThread().sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("線程"+num+"執行完畢");} } package cn.yqg.java;import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;public class Test4 {public static void main(String[] args) {ThreadPoolExecutor pool=new ThreadPoolExecutor(5,10,200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));for(int i=0;i<15;i++) {Task task=new Task(i);pool.execute(task);System.out.println("線程池中線程數目:"+pool.getPoolSize()+",隊列中等待執行的任務數目:"+pool.getQueue().size()+",已執行玩別的任務數目:"+pool.getCompletedTaskCount());}pool.shutdown();} }一種可能情況:
正在執行任務 0 線程池中線程數目:1,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 線程池中線程數目:2,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 線程池中線程數目:3,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 正在執行任務 1 正在執行任務 2 線程池中線程數目:4,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 正在執行任務 3 線程池中線程數目:5,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 線程池中線程數目:5,隊列中等待執行的任務數目:1,已執行玩別的任務數目:0 正在執行任務 4 線程池中線程數目:5,隊列中等待執行的任務數目:2,已執行玩別的任務數目:0 線程池中線程數目:5,隊列中等待執行的任務數目:3,已執行玩別的任務數目:0 線程池中線程數目:5,隊列中等待執行的任務數目:4,已執行玩別的任務數目:0 線程池中線程數目:5,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 線程池中線程數目:6,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行任務 10 線程池中線程數目:7,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行任務 11 線程池中線程數目:8,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行任務 12 線程池中線程數目:9,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行任務 13 線程池中線程數目:10,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行任務 14 線程2執行完畢 線程1執行完畢 線程4執行完畢 線程3執行完畢 正在執行任務 8 線程0執行完畢 正在執行任務 9 正在執行任務 7 正在執行任務 6 正在執行任務 5 線程12執行完畢 線程13執行完畢 線程11執行完畢 線程10執行完畢 線程14執行完畢 線程7執行完畢 線程9執行完畢 線程8執行完畢 線程5執行完畢 線程6執行完畢從執行結果可以看出,當線程池中線程的數目大于5時,便將任務放入任務緩存隊列里面,當任務緩存隊列滿了之后,便創建新的線程。如果上面程序中,將for循環中改成執行超過15個任務,就會拋出任務拒絕異常了。
不過并不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來創建線程池:
Executors.newCachedThreadPool(); //創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE Executors.newSingleThreadExecutor(); //創建容量為1的緩沖池 Executors.newFixedThreadPool(int); //創建固定容量大小的緩沖池 public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); }從它們的具體實現來看,它們實際上也是調用了ThreadPoolExecutor,只不過參數都已配置好了。
newFixedThreadPool創建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置為1,也使用的LinkedBlockingQueue;
newCachedThreadPool將corePoolSize設置為0,將maximumPoolSize設置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就創建線程運行,當線程空閑超過60秒,就銷毀線程。
實際中,如果Executors提供的三個靜態方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。
另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。
from:?https://www.cnblogs.com/zzuli/p/9386463.html
總結
以上是生活随笔為你收集整理的线程池的使用(线程池重点解析)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java并发:线程池详解(ThreadP
- 下一篇: MySql 里的IFNULL、NULLI