Java8 PriorityBlockingQueue源码分析
在看這篇總結(jié)之前,建議大家先熟悉一下 PriorityQueue,這里主要介紹 PriorityBlockingQueue 一些特殊的性質(zhì),關(guān)于優(yōu)先級(jí)隊(duì)列的知識(shí)不作著重介紹,因?yàn)檫^(guò)程與 PriorityQueue 都是一致的。
關(guān)于 PriorityQueue 的文章,你可以參考這里->點(diǎn)擊前往~
PriorityBlockingQueue 相關(guān)源碼分析
add 方法
public boolean add(E e) {return offer(e);}add 方法主要調(diào)用的是 offer 方法,下面我們來(lái)看 offer 方法。
public boolean offer(E e) {// 隊(duì)列所有的元素不允許為 nullif (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;// 加鎖lock.lock();int n, cap;Object[] array;// 判斷是否需要擴(kuò)容while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;// 如果不自定義比較器,則默認(rèn)為一個(gè)小頂堆,從下往上判斷進(jìn)行調(diào)整if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;// 喚醒非空條件對(duì)象notEmpty.signal();} finally {// 釋放鎖lock.unlock();}return true;}offer 方法整體的流程并不是復(fù)雜,首先加鎖,然后判斷是否需要擴(kuò)容,接著添加元素,添加元素也分了兩種情況,一種是沒(méi)有自定義比較器,默認(rèn)是小頂堆,如果初始化了自定義比較器,則按照自定義比較器的邏輯添加元素,因?yàn)樘砑恿嗽?#xff0c;隊(duì)列肯定不為空,因此要喚醒 notEmpty 條件。
我們以不自定義比較器為例,看一下 siftUpComparable 方法是如何調(diào)整堆結(jié)構(gòu)的。
private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (key.compareTo((T) e) >= 0)break;array[k] = e;k = parent;}array[k] = key;}siftUpComparable 方法與 PriorityQueue 中對(duì)應(yīng)的方法簡(jiǎn)直是一模一樣,放一張圖在這,就不具體介紹了。
其中比較有意思的是 tryGrow 擴(kuò)容方法,我們接下來(lái)看一下這個(gè)方法。
/*** Q:擴(kuò)容操作為什么要允許多個(gè)線程進(jìn)來(lái)呢?* A:如果整個(gè)擴(kuò)容過(guò)程還加鎖的話,其他線程是不能修改隊(duì)列的,* 只能等待擴(kuò)容完后才能繼續(xù)執(zhí)行,并發(fā)效率比較低*/ private void tryGrow(Object[] array, int oldCap) {// 釋放鎖lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;/*** compareAndSwapInt:** this:當(dāng)前對(duì)象的引用* allocationSpinLockOffset:allocationSpinLock 在內(nèi)存中的偏移量* 0:allocationSpinLock 的預(yù)期值* 1:更新值*/if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {try {// 當(dāng)容量小于 64 時(shí)容量為原來(lái)的兩倍 + 2,如果大于等于 64 時(shí)擴(kuò)容為原來(lái)的 1.5 倍// 與 PriorityQueue 一致int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflowint minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}// 初始化新的數(shù)組if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {// allocationSpinLock 置 0// 因此后面的線程獲取鎖也可能會(huì)嘗試 CAS 成功,然后初始化新數(shù)組allocationSpinLock = 0;}}/*** 如果當(dāng)前線程嘗試 CAS 失敗,則嘗試讓步* Q:這里為什么要讓步?* A:因?yàn)樽约翰皇浅晒Τ跏蓟聰?shù)組的線程,就算獲取到了線程也不能正確擴(kuò)容,* 因此讓步盡量讓成功擴(kuò)容的線程獲取鎖*/if (newArray == null) // back off if another thread is allocatingThread.yield();/*** Q:在加鎖之前,可能由多個(gè)數(shù)組嘗試 CAS 成功,且成功的初始化了新的數(shù)組,* 那么是不是后面的新數(shù)組會(huì)覆蓋前面的數(shù)組呢?* A:當(dāng)然答案肯定是不會(huì)的,那么是如何保證正確性的呢?關(guān)鍵在于 queue == array 判斷,* 因此只有第一個(gè)判斷成功的線程能正確擴(kuò)容,其他非第一個(gè)線程再進(jìn)行判斷的時(shí)候會(huì)返回 false,* 自然不會(huì)進(jìn)行數(shù)組元素拷貝*/lock.lock();if (newArray != null && queue == array) {// 重置隊(duì)列內(nèi)部數(shù)組queue = newArray;// 元素拷貝,同 PriorityQueueSystem.arraycopy(array, 0, newArray, 0, oldCap);}}這個(gè)方法比較特殊的地方在于先釋放了鎖,然后通過(guò) CAS 操作判斷是否需要初始化新數(shù)組,嘗試 CAS 失敗的線程,會(huì)做出一個(gè)讓步,放棄 CPU 時(shí)間片,然后與其他線程一同競(jìng)爭(zhēng)。這個(gè)過(guò)程我們可以思考以下幾個(gè)問(wèn)題:
- 為什么不直接加鎖而是通過(guò) CAS 加判斷操作完成擴(kuò)容步驟
- 為什么嘗試 CAS 失敗的線程需要讓步
- 在多線程情況下可能會(huì)有多個(gè)線程初始化新數(shù)組,那如何保證操作一致性
這些問(wèn)題在上面的方法里都總結(jié)了一些自己的想法,如果大家有不同的見(jiàn)解可以留言交流。
take 方法
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;// 加鎖(可響應(yīng)中斷)lock.lockInterruptibly();E result;try {// 如果隊(duì)列為空,take 方法會(huì)阻塞出隊(duì)線程while ( (result = dequeue()) == null)/*** 如果隊(duì)列中沒(méi)有元素,會(huì)阻塞后續(xù)調(diào)用 take 方法出隊(duì)的線程* 直到隊(duì)列添加了元素后喚醒 notEmpty,才可以繼續(xù)執(zhí)行*/notEmpty.await();} finally {// 釋放鎖lock.unlock();}return result;}take 方法中調(diào)用了 dequeue 方法,如下:
private E dequeue() {int n = size - 1;if (n < 0)return null;else {Object[] array = queue;// 堆頂?shù)脑?/span>E result = (E) array[0];// 堆最底層的元素(最后一個(gè))E x = (E) array[n];// 把最后一個(gè)元素置 null,因?yàn)橐阉诺蕉秧?#xff0c;向下逐步調(diào)整堆結(jié)構(gòu),與 PriorityQueue 一致array[n] = null;Comparator<? super E> cmp = comparator;if (cmp == null)siftDownComparable(0, x, array, n);elsesiftDownUsingComparator(0, x, array, n, cmp);size = n;return result;}}我們還以不自定義比較器為例,看下 siftDownComparable 方法。
private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {if (n > 0) {Comparable<? super T> key = (Comparable<? super T>)x;int half = n >>> 1; // loop while a non-leafwhile (k < half) {int child = (k << 1) + 1; // assume left child is leastObject c = array[child];int right = child + 1;if (right < n &&((Comparable<? super T>) c).compareTo((T) array[right]) > 0)c = array[child = right];if (key.compareTo((T) c) <= 0)break;array[k] = c;k = child;}array[k] = key;}}過(guò)程與 ProrityQueue 還是一樣的,就不分析了,放一張圖幫助大家理解吧。
 (完)
總結(jié)
以上是生活随笔為你收集整理的Java8 PriorityBlockingQueue源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
 
                            
                        - 上一篇: 焦糖炖蛋怎么做好吃
- 下一篇: 花溪牛肉粉什么时候创立的?
