多线程:当你提交任务时,线程队列已经满了,这时会发生什么?
?
首先我調用Executors創(chuàng)建的線程池出來的對象是ThreadPoolExecutor,ScheduledThreadPoolExecutor,DelegatedExecutorService這三個類中的一個! 而ScheduledThreadPoolExecutor是ThreadPoolExecutor的子類, DelegatedExecutorService是對ExecutorService進行一層包裝.
今天我們這里不討論每一種線程池是干什么的, 只討論RejectedExecutionException的原由, 所以我只拿ThreadPoolExecutor這個作為例子說了.
本文基于JDK1.8源碼進行分析
當我們調用Executors.newFixedThreadPool(int?nThreads )時會創(chuàng)建一個線程池給我. 源碼這個方法的實現(xiàn)是:
public static ExecutorService newFixedThreadPool(int nThreads) {
? ? ? ? return new ThreadPoolExecutor(nThreads, nThreads,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue<Runnable>());
? ? }
那么我跟進去這個構造方法
public ThreadPoolExecutor(int corePoolSize,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? int maximumPoolSize,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit unit,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? BlockingQueue<Runnable> workQueue) {
? ? ? ? this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
? ? ? ? ? ? ?Executors.defaultThreadFactory(), defaultHandler);
? ? }
?
public ThreadPoolExecutor(int corePoolSize,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? int maximumPoolSize,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit unit,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? BlockingQueue<Runnable> workQueue,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ThreadFactory threadFactory,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? RejectedExecutionHandler handler) {
? ? ? ? if (corePoolSize < 0 ||
? ? ? ? ? ? maximumPoolSize <= 0 ||
? ? ? ? ? ? maximumPoolSize < corePoolSize ||
? ? ? ? ? ? keepAliveTime < 0)
? ? ? ? ? ? throw new IllegalArgumentException();
? ? ? ? if (workQueue == null || threadFactory == null || handler == null)
? ? ? ? ? ? throw new NullPointerException();
? ? ? ? this.corePoolSize = corePoolSize;
? ? ? ? this.maximumPoolSize = maximumPoolSize;
? ? ? ? this.workQueue = workQueue;
? ? ? ? this.keepAliveTime = unit.toNanos(keepAliveTime);
? ? ? ? this.threadFactory = threadFactory;
? ? ? ? this.handler = handler;
? ? }
Ok,當我看到這里的時候, 我已經(jīng)看到我想要的了.RejectedExecutionHandler這個是拒絕執(zhí)行者. 那么我看看這個我傳進去的defaultHandler是什么, 它就是終止策略
private static final RejectedExecutionHandler defaultHandler =
? ? ? ? new AbortPolicy();
這個類的實現(xiàn)
public static class AbortPolicy implements RejectedExecutionHandler {
? ? ? ? /**
? ? ? ? ?* Creates an <tt>AbortPolicy</tt>.
? ? ? ? ?*/
? ? ? ? public AbortPolicy() { }
?
? ? ? ? /**
? ? ? ? ?* Always throws RejectedExecutionException.
? ? ? ? ?* @param r the runnable task requested to be executed
? ? ? ? ?* @param e the executor attempting to execute this task
? ? ? ? ?* @throws RejectedExecutionException always.
? ? ? ? ?*/
? ? ? ? public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
? ? ? ? ? ? throw new RejectedExecutionException();
? ? ? ? }
? ? }
直接拋這個異常出去. 那么我明白了, 就是因為我的線程池的拒絕策略是AbortPolicy,所以會導致拋異常!??好, 那么現(xiàn)在我知道為什么拋異常了, 是不是就夠了呢? 一般來說用來解決問題是夠了, 但是我還想研究下什么情況下會拋這個異常, 和它的終極解決方案!
我們先來看看線程池的四種拒絕策略:
?注:?當線程池的飽和策略,當阻塞隊列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務,必須采取一種策略處理該任務,線程池提供了4種策略:
ThreadPoolExecutor.AbortPolicy:丟棄任務并拋出RejectedExecutionException異常。注:默認策略!!!!!!
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執(zhí)行任務(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
?
1:解決方案的探索非常簡單,??無非就是RejectedExecutionHandler嘛, 我Ctrl+T看看他有什么實現(xiàn)類就Ok了嘛,??它有四個實現(xiàn)類.AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy.?
關于AbortPolicy我剛才已經(jīng)說了,它的拒絕策略就是拋異常. 說說下面三個.
CallerRunsPolicy這個的拒絕策略是如果線程池沒有shutDown,就會執(zhí)行需要執(zhí)行的Runnable
?
public static class CallerRunsPolicy implements RejectedExecutionHandler {
? ? ? ? /**
? ? ? ? ?* Creates a <tt>CallerRunsPolicy</tt>.
? ? ? ? ?*/
? ? ? ? public CallerRunsPolicy() { }
?
? ? ? ? /**
? ? ? ? ?* Executes task r in the caller's thread, unless the executor
? ? ? ? ?* has been shut down, in which case the task is discarded.
? ? ? ? ?* @param r the runnable task requested to be executed
? ? ? ? ?* @param e the executor attempting to execute this task
? ? ? ? ?*/
? ? ? ? public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
? ? ? ? ? ? if (!e.isShutdown()) {
? ? ? ? ? ? ? ? r.run();
? ? ? ? ? ? }
? ? ? ? }
? ? }
所以,我們解決拋異常的就直接用CallerRunsPolicy這個策略就可以了.
再來DiscardPolicy, 這個策略就是忽略, 即你隨意提交任務, 我不鳥你就完了.
public static class DiscardPolicy implements RejectedExecutionHandler {
? ? ? ? /**
? ? ? ? ?* Creates a <tt>DiscardPolicy</tt>.
? ? ? ? ?*/
? ? ? ? public DiscardPolicy() { }
?
? ? ? ? /**
? ? ? ? ?* Does nothing, which has the effect of discarding task r.
? ? ? ? ?* @param r the runnable task requested to be executed
? ? ? ? ?* @param e the executor attempting to execute this task
? ? ? ? ?*/
? ? ? ? public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
? ? ? ? }
? ? }
DiscardOldestPolicy策略是說忽略最老的任務,然后執(zhí)行我們提交的.
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
? ? ? ? /**
? ? ? ? ?* Creates a <tt>DiscardOldestPolicy</tt> for the given executor.
? ? ? ? ?*/
? ? ? ? public DiscardOldestPolicy() { }
?
? ? ? ? /**
? ? ? ? ?* Obtains and ignores the next task that the executor
? ? ? ? ?* would otherwise execute, if one is immediately available,
? ? ? ? ?* and then retries execution of task r, unless the executor
? ? ? ? ?* is shut down, in which case task r is instead discarded.
? ? ? ? ?* @param r the runnable task requested to be executed
? ? ? ? ?* @param e the executor attempting to execute this task
? ? ? ? ?*/
? ? ? ? public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
? ? ? ? ? ? if (!e.isShutdown()) {
? ? ? ? ? ? ? ? e.getQueue().poll();
? ? ? ? ? ? ? ? e.execute(r);
? ? ? ? ? ? }
? ? ? ? }
? ? }
從實現(xiàn)也可以看出來, 把隊列里面頂部的彈掉, 再執(zhí)行我們的任務
大家可以根據(jù)不同所需, 在創(chuàng)建線程池之后, 用你們的ThreadPoolExecutor.setRejectedExecutionHandler(RejectedExecutionHandler handler)去設置你們的拒絕策略
2:那么本文還要繼續(xù)探索何時這些拒絕策略會被調用呢? 我以ThreadPoolExecutor的execute()方法說事了, ScheduledThreadPoolExecutor的submit方法是大同小異的,請大家自己跟代碼去吧.
execute方法的javadoc中有這么一句話:
If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached,
the task is handled by the current RejectedExecutionHandler
看看execute的實現(xiàn)先
public void execute(Runnable command) {
? ? ? ? if (command == null)
? ? ? ? ? ? throw new NullPointerException();
? ? ? ? if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
? ? ? ? ? ? if (runState == RUNNING && workQueue.offer(command)) {
? ? ? ? ? ? ? ? if (runState != RUNNING || poolSize == 0)
? ? ? ? ? ? ? ? ? ? ensureQueuedTaskHandled(command);
? ? ? ? ? ? }
? ? ? ? ? ? else if (!addIfUnderMaximumPoolSize(command))
? ? ? ? ? ? ? ? reject(command); // is shutdown or saturated
? ? ? ? }
? ? }
這里一共涉及到三個方法,addIfUnderCorePoolSize,ensureQueuedTaskHandled,addIfUnderMaximumPoolSize
只要執(zhí)行到reject(command)這里就會調用我們那個Handler的rejectedExecution()方法.
那三個方法以及javadoc都告訴我們,??如果這個線程池已經(jīng)shutdown或者容量滿了之后, 就會調用拒絕策略.. 請注意,. 從實現(xiàn)來看,??這個容量滿了是指當前的線程池以及其緩沖隊列的容量都滿了 才是滿了, 而不是線程池的數(shù)量。
既然提到阻塞隊列已滿說明不是LinkedBlockingQueue!因為LinkedBlockingQueue是近似無界的!
由于默認是AbortPolicy拒絕策略,因此極可能是拋出RejectedExecutionException異常,如果是其它的策略就要另當別論!面試是要闡述清楚!
?
總結
以上是生活随笔為你收集整理的多线程:当你提交任务时,线程队列已经满了,这时会发生什么?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 多线程:保证三个线程依次按顺序执行?ne
- 下一篇: 多线程:Immutable对象?如何创建