谈谈 ForkJoin 框架的设计与实现
在了解Fork-Join之前,我們得先了解什么是并行計(jì)算。
并行計(jì)算
相對(duì)于串行計(jì)算,并行計(jì)算可以劃分成時(shí)間并行和空間并行。時(shí)間并行即指令流水化,也就是流水線技術(shù)。比如說生產(chǎn)一輛小汽車,有特定的輪子車間/發(fā)動(dòng)機(jī)車間,同時(shí)進(jìn)行各自的生產(chǎn)。空間并行是指使用多個(gè)處理器執(zhí)行并發(fā)計(jì)算。
以程序和算法設(shè)計(jì)人員的角度看,并行計(jì)算又可分為數(shù)據(jù)并行和任務(wù)并行。數(shù)據(jù)并行把大的任務(wù)化解成若干個(gè)相同的子任務(wù),任務(wù)并行是指每一個(gè)線程執(zhí)行一個(gè)分配到的任務(wù),而這些線程則被分配(通常是操作系統(tǒng)內(nèi)核)到該并行計(jì)算體系的各個(gè)計(jì)算節(jié)點(diǎn)中去。
簡(jiǎn)單來說,并行計(jì)算是通過把大問題劃分為小問題,運(yùn)用計(jì)算機(jī)資源并行的處理子問題,當(dāng)需要得到大問題的結(jié)果時(shí),將小問題的結(jié)果按順序合并起來得到最終結(jié)果。這種思想就是分治思想,小到歸并排序,大到大數(shù)據(jù)計(jì)算...
?
Fork-Join
Fork-Join框架是Doug Lea 大神在JDK7引入的。Fork就是把大問題拆分成小問題,也就是大任務(wù)拆成多個(gè)子任務(wù),并行執(zhí)行子任務(wù)。Join就是把任務(wù)的結(jié)果按順序合并起來。
假設(shè)我們需要求從 1-1億之間的數(shù)字和,按照Fork-Join的思想,可分為以下三步:
Step1.定義拆分子任務(wù)和合并子任務(wù)的規(guī)則
-
劃分子任務(wù)的規(guī)則
首先將任務(wù)拆為 1-5千萬(wàn) 和 5千萬(wàn)01 - 1億兩個(gè)子任務(wù),直到每個(gè)子任務(wù)計(jì)算的數(shù)字范圍在1萬(wàn)以內(nèi)的時(shí)候,我們才計(jì)算這個(gè)任務(wù)的和。
-
合并子任務(wù)的規(guī)則
同一父任務(wù)的所有子任務(wù)的結(jié)果再相加,就是這一父任務(wù)的結(jié)果。
Step2.充分利用計(jì)算機(jī)資源,最大并行的執(zhí)行子任務(wù)
Step3.充分利用計(jì)算機(jī)資源,執(zhí)行合并所有子任務(wù),獲得最終的結(jié)果
顯然一般人做不了后兩步,我們只需要把?怎么拆,怎么和?告訴Fork-Join框架,Fork-Join框架就幫我們做好?如何最大并行執(zhí)行子任務(wù)?和?如何最有效合并子任務(wù)。
設(shè)計(jì)原理
如何充分利用計(jì)算機(jī)資源,最大并行執(zhí)行子任務(wù)?
一般小伙伴應(yīng)該可以想到使用多線程,讓線程數(shù)等于CPU核數(shù)。此時(shí)可以充分利用CPU的計(jì)算資源。
我們來看一下JDK普通線程池是咋玩的。(不要說你不懂為啥池化 :)
任務(wù)都是丟到一個(gè)同步隊(duì)列BlockingQueue中的。如果你了解JDK BlockingQueue的實(shí)現(xiàn),就知道有界的同步隊(duì)列都是用鎖阻塞的,有些push/poll操作還共用一把鎖。
問題1:并行的任務(wù)有必要共用一個(gè)阻塞隊(duì)列嗎?
問題2: 如果任務(wù)隊(duì)列中的任務(wù)存在依賴,worker線程只能被阻塞著。啥意思呢?
假設(shè)任務(wù)隊(duì)列中存在兩個(gè)任務(wù)task1和task2,task1的執(zhí)行結(jié)果依賴于task2的結(jié)果。如果worker1先拉取到task1,結(jié)果發(fā)現(xiàn)此時(shí)task2還沒有被執(zhí)行。則worker1只能阻塞等待別的worker拉取到task2,task2執(zhí)行完了worker1才能繼續(xù)執(zhí)行task1。
如果worker1當(dāng)發(fā)現(xiàn)task1無法繼續(xù)執(zhí)行下去時(shí),能夠先把它放一邊,繼續(xù)拉取任務(wù)執(zhí)行。這樣效率是比較高的。
Work?Stealing
Fork-Join框架通過Work?Stealing算法解決上面兩個(gè)問題。
-
每個(gè)線程擁有自己的任務(wù)隊(duì)列,并且是雙端隊(duì)列。
-
線程操作自己的任務(wù)隊(duì)列是LIFO(Last in First out)模式。
-
線程還可以偷取別的線程任務(wù)隊(duì)列中的任務(wù),模式為FIFO(First in First out)。
顯然?每個(gè)線程擁有自己的任務(wù)隊(duì)列可以提高獲取隊(duì)列的并行度。
雙端任務(wù)隊(duì)列將所屬的自己線程的push/pop操作?和?其他線程的steal操作通過不同的模式區(qū)分開。這樣只有當(dāng)Base==Top-1時(shí),pop操作和steal操作才會(huì)有沖突。
如何才能準(zhǔn)確及時(shí)知道Base==Top-1呢,Fork-Join框架的牛逼之處也在于對(duì)任務(wù)的調(diào)度是輕量級(jí)的。
steal操作
考慮steal操作,是多個(gè)其他線程的同步操作。需要保證:偷到Base處的任務(wù)和Base++的原子性,同時(shí)Base的值一旦改變,其他線程應(yīng)該能夠馬上可見。聰明的小伙伴是不是想到?鎖和volatile?了:)
//steal操作 就是 poll()方法 final ForkJoinTask<?> poll() {ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t; //array就是雙端隊(duì)列,實(shí)際用數(shù)組實(shí)現(xiàn)。 //base是將要偷的任務(wù)下標(biāo),base是用volatile修飾的,保證可見性 //top是將要push進(jìn)去的任務(wù)下標(biāo),可參考上面示意圖while ((b = base) - top < 0 && (a = array) != null) {//說明經(jīng)過while條件初步判斷任務(wù)隊(duì)列不為空 //獲取base處的任務(wù)在任務(wù)隊(duì)列中的偏移量int j = (((a.length - 1) & b) << ASHIFT) + ABASE; //用volatile load 語(yǔ)義取出base處的任務(wù)t,可以簡(jiǎn)單理解為一定是最新修改版本的任務(wù)t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); //再次讀取base,判斷此時(shí)t是否被別的線程偷走if (base == b) {if (t != null) { //如果多次讀判斷都沒啥問題,CAS修改base處的任務(wù)t為nullif (U.compareAndSwapObject(a, j, t, null)) { //如果上面修改成功,表示這個(gè)任務(wù)被該線程偷到了 //此時(shí)就將base指針向前移一位,注意這一步是原子操作,base++就不是了base = b + 1;return t;}}else if (b + 1 == top) // 如果t==null && b + 1 == top,此時(shí)任務(wù)隊(duì)列為空break;}}return null;}簡(jiǎn)單來說,有任務(wù)可偷時(shí),通過CAS偷任務(wù)保證只有一個(gè)線程能偷成功,偷成功的這個(gè)線程接著修改volatile base指針,使得馬上對(duì)其他線程可見。同時(shí)通過前面的多次讀判斷減少后期CAS并發(fā)的沖突概率。沒任務(wù)可偷時(shí),通過CAS偷任務(wù)失敗可以判斷出來。
請(qǐng)小伙伴一句句看上面的代碼,阿姨都注釋出來了。雖然上面并沒有鎖,,但是小伙伴想想鎖其實(shí)是悲觀控制并發(fā)的思想,是不是可以拆成多次讀判斷 + CAS原子修改的樂觀思想來控制并發(fā)。只要最終保證只有一個(gè)能修改成功就可以了。
push操作
考慮push操作,是任務(wù)隊(duì)列所屬的線程才能操作,天生線程安全:不需要通過CAS或鎖來保證同步,只需要原子的修改top處任務(wù)?和?top向前移一位?就可以了。同理,top也不需要用volatile修飾。
final void push(ForkJoinTask<?> task) {ForkJoinTask<?>[] a; ForkJoinPool p;int b = base, s = top, n;if ((a = array) != null) { // ignore if queue removedint m = a.length - 1; // fenced write for task visibility //更新雙端隊(duì)列array的top處任務(wù)為task,直接原子更新,非CAS操作 //因?yàn)檫@個(gè)方法只會(huì)被array所屬的線程調(diào)用,所以這里是線程安全的U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); //top指針向前移一位U.putOrderedInt(this, QTOP, s + 1);if ((n = s - b) <= 1) { //說明未push前隊(duì)列中最多有一個(gè)任務(wù)if ((p = pool) != null) //此時(shí)喚醒其他等待的線程,表示整體pool中有事情可以做了。。p.signalWork(p.workQueues, this);}else if (n >= m) //隊(duì)列擴(kuò)容growArray();}}小伙伴思考下,這里Base和Top指針會(huì)存在任務(wù)沖突嗎?其實(shí)不會(huì)哦,因?yàn)閮蓚€(gè)指針都在往前沖,Base永遠(yuǎn)追趕不上Top。這個(gè)方法額外需要做的事情?是?喚醒空閑線程?表示有任務(wù)進(jìn)來了, 判斷隊(duì)列是否需要擴(kuò)容就好。
pop操作
考慮pop操作,雖然任務(wù)隊(duì)列所屬的線程才能操作,但是當(dāng)任務(wù)隊(duì)列只有一個(gè)任務(wù)時(shí),存在steal操作和pop操作的任務(wù)競(jìng)爭(zhēng)。原理就和steal操作一致了,當(dāng)CAS修改top-1處任務(wù)為空 成功時(shí),再更新top值為top-1。
final ForkJoinTask<?> pop() {ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;if ((a = array) != null && (m = a.length - 1) >= 0) {for (int s; (s = top - 1) - base >= 0;) {long j = ((m & s) << ASHIFT) + ABASE;if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)break;if (U.compareAndSwapObject(a, j, t, null)) {U.putOrderedInt(this, QTOP, s);return t;}}}return null;}注意這個(gè)pop操作并沒有steal操作那么多次預(yù)讀避免并發(fā)競(jìng)爭(zhēng),小姐姐yy是因?yàn)閜op操作只有在任務(wù)隊(duì)列中只有一個(gè)任務(wù)時(shí),才會(huì)存在和Steal操作的競(jìng)爭(zhēng)問題。而Steal操作也時(shí)時(shí)可能存在多個(gè)其他線程的競(jìng)爭(zhēng)問題的。
通過上面三個(gè)任務(wù)調(diào)度方法的分析,你有沒有感受到一絲絲FJ的調(diào)度輕量級(jí)呢?
總結(jié)一下:Fork-Join框架通過將共享的任務(wù)隊(duì)列拆分成線程獨(dú)有的雙端任務(wù)隊(duì)列,多線程steal操作通過多次讀和CAS保證同步,steal操作和pop操作??通過CAS?保證同步,push操作線程安全,不需要同步。
問題是什么時(shí)候線程消費(fèi)自己的任務(wù)隊(duì)列中的任務(wù),什么時(shí)候會(huì)去偷別的線程的任務(wù),一個(gè)任務(wù)在Fork-Join框架中的生命周期是怎樣的,又是怎么流轉(zhuǎn)的?
?
Fork-Join框架使用
要能回答上面的問題,我們先看一下如何使用Fork-Join框架。上面這三個(gè)方法并不是我們能直接調(diào)用的,這三個(gè)方法是Fork-Join自己在合適的時(shí)機(jī)自己調(diào)用的。像最開始所說,使用者只需要:定義好拆分子任務(wù)和合并子任務(wù)的規(guī)則的大任務(wù),并且把任務(wù)丟給ForkJoinPool就好
求 1-1億之間的數(shù)字和
Step1.定義一個(gè)求和的任務(wù)類
繼承RecursiveTask類,重寫其compute()方法:
RecursiveTask如其名,是一個(gè)歸并任務(wù)。compute()方法是具體如何拆分,如何歸并的實(shí)現(xiàn)。fork()方法就是在確定拆分子任務(wù)規(guī)則時(shí)調(diào)用的,該方法會(huì)把子任務(wù)push到當(dāng)前線程自己的任務(wù)隊(duì)列中;join()方法就是在確定合并子任務(wù)的規(guī)則時(shí)調(diào)用的,該方法會(huì)等待直到返回子任務(wù)的結(jié)果。
public class SumTask extends RecursiveTask<Long> {private long[] numbers;private int from;private int to;public SumTask(long[] numbers, int from, int to) {this.numbers = numbers;this.from = from;this.to = to;}@Overrideprotected Long compute() {//拆分子任務(wù)的規(guī)則:// 1.當(dāng)需要計(jì)算的數(shù)字小于6時(shí),直接計(jì)算結(jié)果if (to - from < 6) {long total = 0;for (int i = from; i <= to; i++) {total += numbers[i];}return total;// 2.否則,把任務(wù)一分為二,遞歸計(jì)算} else {int middle = (from + to) / 2;//構(gòu)造子任務(wù)SumTask taskLeft = new SumTask(numbers, from, middle);SumTask taskRight = new SumTask(numbers, middle+1, to);//將子任務(wù)添加到任務(wù)隊(duì)列,這一步我們還是要做了taskLeft.fork();taskRight.fork();//合并所有子任務(wù)的規(guī)則:所有子任務(wù)的結(jié)果相加return taskLeft.join() + taskRight.join();}}}在等待子任務(wù)結(jié)果的時(shí)候,線程被阻塞了嗎?
(當(dāng)然沒有,這段時(shí)間其實(shí)就會(huì)偷任務(wù)來做。后面我們?cè)俜治?#xff1a;)
Step2.構(gòu)造一個(gè)Fork-Join線程池,把上面的求和大任務(wù)SumTask丟進(jìn)去
public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool();SumTask sumTask = new SumTask(numbers, 0, numbers.length-1)long result = pool.invoke(sumTask);System.out.println(result); }從這里,我們可以看到任務(wù)丟進(jìn)線程池是調(diào)用的pool.invoke(sumTask)
( 熟悉JDK線程池實(shí)現(xiàn)的小伙伴可以結(jié)合上面ForkJoin框架的原理想想任務(wù)該如何流轉(zhuǎn)。小姐姐開始了:)
一個(gè)歸并任務(wù)的流轉(zhuǎn)
Step1.任務(wù)提交到任務(wù)隊(duì)列
包括invoke等所有任務(wù)提交方法最終都會(huì)調(diào)用ForkJoinPool.externalPush方法。
這里面需要考慮將任務(wù)提交到哪個(gè)隊(duì)列?
如果提交到ForkJoinWorkerThread自己的雙端任務(wù)隊(duì)列中:不管提交到頭還是尾,都會(huì)和我們上面分析的三個(gè)操作發(fā)生任務(wù)沖突。而且如何選擇負(fù)載最小的線程來提交也會(huì)增加問題復(fù)雜性。
ForkJoinPool中雙端任務(wù)隊(duì)列是用數(shù)組(volatile WorkQueue[] workQueues)實(shí)現(xiàn)的,其中奇數(shù)下標(biāo)存放的是可激活的任務(wù)隊(duì)列,偶數(shù)下標(biāo)存放的是不可激活的任務(wù)隊(duì)列。激活指的是這個(gè)隊(duì)列是否是某個(gè)ForkJoin線程的任務(wù)隊(duì)列。
ForkJoinPool.externalPush只能將任務(wù)提交到不可激活任務(wù)隊(duì)列,該方法的主要邏輯為:
當(dāng)提交的任務(wù)是pool的第一個(gè)任務(wù)時(shí),會(huì)初始化workQueues,ForkJoinWorkerThread等資源,通過hash算法選擇一個(gè)偶數(shù)下標(biāo)的workQueue,在TOP處放入任務(wù)。同時(shí)喚醒ForkJoinWorkerThread開始拉取任務(wù)工作。
當(dāng)提交的任務(wù)不是第一個(gè)任務(wù),此時(shí)workQueues等資源已初始化好。同樣需要選擇一個(gè)偶數(shù)下標(biāo)的workQueue存放任務(wù),如果選中的workQueue只有這一個(gè)任務(wù),說明之前線程資源大概率是閑置的狀態(tài),會(huì)嘗試?喚醒(signalWork方法)?一個(gè)空閑的ForkJoinWorkerThread開始拉取任務(wù)工作。
Step2.ForkJoinWorkerThread的運(yùn)行
我們先看一下任務(wù)的生產(chǎn)和消費(fèi)模式:
可激活的workQueue自己所屬ForkJoinWorkerThread的任務(wù)模式是LIFO(Last In First Out)
不可激活的workQueue的任務(wù)模式是FIFO(First In First Out)
ForkJoinWorkerThread剛開始運(yùn)行時(shí)會(huì)調(diào)用ForkJoinWorkerThread.scan方法隨機(jī)選取一個(gè)隊(duì)列從Base處撈取任務(wù).撈取到任務(wù)會(huì)調(diào)用WorkQueue.runTask方法執(zhí)行任務(wù),最終對(duì)于RecursiveTask任務(wù)執(zhí)行的是RecursiveTask.exec方法。
protected final boolean exec() { //我們一開始定義SumTask的實(shí)現(xiàn)方法:computeresult = compute();return true;}里面調(diào)用的就是我們一開始定義SumTask的實(shí)現(xiàn)方法:compute方法。
fork所做的事情就是將我們切分的子任務(wù)添加到當(dāng)前ForkJoinWorkerThread自己的workQueue中
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)((ForkJoinWorkerThread)t).workQueue.push(this);join所做的事情就是等待子任務(wù)的返回結(jié)果
public final V join() {int s;//doJoin會(huì)返回執(zhí)行結(jié)果if ((s = doJoin() & DONE_MASK) != NORMAL)//結(jié)果有異常,拋出異常信息reportException(s);//結(jié)果無異常,返回正常結(jié)果return getRawResult();}講原理的時(shí)候我們提到了當(dāng)調(diào)用join獲取任務(wù)結(jié)果時(shí),ForkJoinWorkerThread會(huì)根據(jù)當(dāng)前任務(wù)的情況,做出最正確的執(zhí)行判斷,而不是單純的阻塞等待結(jié)果。
Step3.join時(shí)執(zhí)行任務(wù)的判斷
結(jié)合上面求和的例子,我們來看一下求1-10之間的數(shù)字和的求和任務(wù)的可能join過程:
case1:任務(wù)未被偷
假設(shè)求和?1-10任務(wù)被Thread1執(zhí)行,fork出兩個(gè)子任務(wù):1-5?和?6-10,只要Thread1能判斷出來要join的任務(wù)在自己的任務(wù)隊(duì)列中,那當(dāng)前join哪個(gè)子任務(wù)就把它取出來執(zhí)行就可以。
case2:任務(wù)被偷,此時(shí)自己的任務(wù)隊(duì)列為空,可以幫助小偷執(zhí)行它未完成的任務(wù)
假設(shè)求和?1-10任務(wù)被Thread1執(zhí)行,fork出兩個(gè)子任務(wù):1-5?和?6-10。6-10已成功執(zhí)行完成,join返回了結(jié)果。但此時(shí)發(fā)現(xiàn)1-5被Thread2偷走了,自己的任務(wù)隊(duì)列中已經(jīng)沒有任務(wù)可以執(zhí)行了。此時(shí)Thread1可以找到小偷Thread2,并偷取Thread2的10-20任務(wù)來幫助它執(zhí)行。
case3:任務(wù)被偷,此時(shí)自己的任務(wù)隊(duì)列不為空
假設(shè)求和?1-10任務(wù)被Thread1執(zhí)行,fork出兩個(gè)子任務(wù):1-5?和?6-10,要join?1-5時(shí)發(fā)現(xiàn)已經(jīng)被Thread2偷走了,而自己隊(duì)列中還有6-10等待join執(zhí)行。不好意思幫不了小偷了。
只好嘗試掛起自己等待1-5的執(zhí)行結(jié)果通知,并嘗試喚醒空閑線程或者創(chuàng)建新的線程替代自己執(zhí)行任務(wù)隊(duì)列中的6-10任務(wù)。
上述三種情況代碼均在ForkJoinPool.awaitJoin方法中。整體思路是:
當(dāng)任務(wù)還在自己的隊(duì)列:
-
自己執(zhí)行,獲取結(jié)果。
當(dāng)被別人偷走阻塞了:
-
自己又沒任務(wù)執(zhí)行,就幫助小偷執(zhí)行任務(wù)。
-
自己有任務(wù)要執(zhí)行,就嘗試掛起自己等待小偷的反饋結(jié)果,同時(shí)找隊(duì)友幫助自己執(zhí)行。
這里任務(wù)模式有意思的是:
scan/steal操作都是從Base處獲取任務(wù),那么更容易獲取到大的任務(wù)執(zhí)行,從而使得整體線程的資源分配更加均衡。
任務(wù)隊(duì)列所屬的線程是LIFO的任務(wù)生產(chǎn)消費(fèi)模式,剛好符合遞歸任務(wù)的執(zhí)行順序。
?
至此你有沒有對(duì)ForkJoin框架的輕量級(jí)調(diào)度和Work?Stealing算法有一些了解呀:)
參考資料:
[1].http://gee.cs.oswego.edu/dl/papers/fj.pdf
[2].https://juejin.im/entry/5a027e2bf265da43247fdef7
[3].https://www.jianshu.com/p/f777abb7b251
[4].http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/
[5].https://zhuanlan.zhihu.com/p/38204373
[6].https://zhuanlan.zhihu.com/p/68554017
[7].https://zh.wikipedia.org/wiki/%E5%B9%B6%E8%A1%8C%E8%AE%A1%E7%AE%97
總結(jié)
以上是生活随笔為你收集整理的谈谈 ForkJoin 框架的设计与实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何选择 Git 分支模式?
- 下一篇: 厉害了,如何通过双 key 来解决缓存并