PriorityBlockingQueue源码
介紹
一個支持線程優先級排序的無界隊列,默認自然序進行排序,也可以自定義實現compareTo()方法來指定元素排序規則,不能保證同優先級元素的順序。隊列中的元素必須是可比較的,即實現Comparable接口,或者在構建函數時提供可對隊列元素進行比較的Comparator對象。不可以放null,會報空指針異常。
數據結構
PriorityBlockingQueue內部使用heap做為存儲結構,如下圖:
二叉樹存入數組的方式很簡單,就是從上到下,從左到右。PriorityQueue的是一個有特點的完全二叉樹,且不允許出現null節點,其父節點都比葉子節點小,這個是堆排序中的小頂堆。如果按數組順序我們可以得到如下結論:
- 左葉子節點=父節點下標*2+1
- 右葉子節點=父節點下標*2+2
- 父節點=(葉子節點-1)/2
加入節點:
新加入的元素x可能會破壞小頂堆的性質,因此需要進行調整。調整的過程為:從k指定的位置開始,將x逐層與當前點的parent進行比較并交換,直到滿足x >= queue[parent]為止
獲取元素
由于堆用數組表示,根據下標關系,0下標處的那個元素既是堆頂元素。所以直接返回數組0下標處的那個元素即可。
刪除第一個元素
從k指定的位置開始,將x逐層向下與當前點的左右孩子中較小的那個交換,直到x小于或等于左右孩子中的任何一個為止。
刪除任意一個元素
由于刪除操作會改變隊列結構,所以要進行調整;又由于刪除元素的位置可能是任意的,所以調整過程比其它函數稍加繁瑣。具體來說,remove(Object o)可以分為2種情況:1. 刪除的是最后一個元素。直接刪除即可,不需要調整。2. 刪除的不是最后一個元素,從刪除點開始以最后一個元素為參照調用一次siftDown()即可
?
主要屬性
/*** 空間大小默認值:11.*/private static final int DEFAULT_INITIAL_CAPACITY = 11;/*** 空間大小最大值:Integer.MAX_VALUE - 8.*/private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;/*** 隊列元素數組。平衡二叉堆實現,父節點下標是n,左節點則是2n+1,右節點是2n+2。最小的元素在最前面,元素通過comparator比較。*/private transient Object[] queue;/*** 入隊元素個數*/private transient int size;/*** The comparator, or null 表示自然排序*/private transient Comparator<? super E> comparator;/*** Lock used for all public operations*/private final ReentrantLock lock;/*** Condition for blocking when empty*/private final Condition notEmpty;/***擴容數組分配資源時的自旋鎖,CAS需要*/private transient volatile int allocationSpinLock;/***只用于序列化的時候,為了兼容之前的版本。只有在序列化和反序列化的時候不為null。*/private PriorityQueue<E> q;方法實現
offer,poll,peek
public boolean offer(E e) {if (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;//如果入隊數量大于或者等于heap大小,則擴容while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}public boolean offer(E e, long timeout, TimeUnit unit) {return offer(e); // never need to block}public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return dequeue();} finally {lock.unlock();}}public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null && nanos > 0)nanos = notEmpty.awaitNanos(nanos);} finally {lock.unlock();}return result;}public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return (size == 0) ? null : (E) queue[0];} finally {lock.unlock();}}?put,take
public void put(E e) {offer(e); // never need to block}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null)notEmpty.await();} finally {lock.unlock();}return result;}enqueue,dequeue
private E dequeue() {int n = size - 1;if (n < 0)return null;else {Object[] array = queue;//array[0]即為隊首。E result = (E) array[0];//最后一個元素E x = (E) array[n];array[n] = null;Comparator<? super E> cmp = comparator;//把最后一個元素放置在0位置。并進行下沉。if (cmp == null)siftDownComparable(0, x, array, n);elsesiftDownUsingComparator(0, x, array, n, cmp);size = n;return result;}} private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (key.compareTo((T) e) >= 0)break;array[k] = e;k = parent;}array[k] = key;}private static <T> void siftUpUsingComparator(int k, T x, Object[] array,Comparator<? super T> cmp) {while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (cmp.compare(x, (T) e) >= 0)break;array[k] = e;k = parent;}array[k] = x;}?up,down
/*** 在位置k處插入x。一直向root方向up,直到大于等于等于它的父親* @param k the position to fill* @param x the item to insert* @param array the heap array*/ private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {//獲取父親元素下標。int parent = (k - 1) >>> 1;Object e = array[parent];//不比父親元素小,則退出if (key.compareTo((T) e) >= 0)break;array[k] = e;k = parent;}//在位置k處插入xarray[k] = key;}private static <T> void siftUpUsingComparator(int k, T x, Object[] array,Comparator<? super T> cmp) {while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (cmp.compare(x, (T) e) >= 0)break;array[k] = e;k = parent;}array[k] = x;} /*** 在k處插入x,將x逐層向下與當前點的左右孩子中較小的那個交換,直到x小于或等于左右孩子中的任何一個為止* @param k the position to fill* @param x the item to insert* @param array the heap array* @param n heap size*/private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {if (n > 0) {Comparable<? super T> key = (Comparable<? super T>)x;int half = n >>> 1; // k <half,才可能有子節點。while (k < half) {int child = (k << 1) + 1; // assume left child is leastObject c = array[child];int right = child + 1;//有右孩子,并且右孩子值小于或者右孩子,則與右孩子進行交換if (right < n &&((Comparable<? super T>) c).compareTo((T) array[right]) > 0)c = array[child = right];//如果值小于孩子,直接退出if (key.compareTo((T) c) <= 0)break;//否則,k處放置孩子的值,k設置為孩子的位置array[k] = c;k = child;}array[k] = key;}}private static <T> void siftDownUsingComparator(int k, T x, Object[] array,int n,Comparator<? super T> cmp) {if (n > 0) {int half = n >>> 1;while (k < half) {int child = (k << 1) + 1;Object c = array[child];int right = child + 1;if (right < n && cmp.compare((T) c, (T) array[right]) > 0)c = array[child = right];if (cmp.compare(x, (T) c) <= 0)break;array[k] = c;k = child;}array[k] = x;}}?
private void removeAt(int i) {Object[] array = queue;int n = size - 1;if (n == i) // 最后一個元素array[i] = null;else {E moved = (E) array[n];array[n] = null;Comparator<? super E> cmp = comparator;if (cmp == null)siftDownComparable(i, moved, array, n);elsesiftDownUsingComparator(i, moved, array, n, cmp);//如果是最后一個元素移動到i,說明未下層,則UPif (array[i] == moved) {if (cmp == null)siftUpComparable(i, moved, array);elsesiftUpUsingComparator(i, moved, array, cmp);}}size = n;}注意
- 所有入庫操作,例如offer,put等都不會阻塞,因為隊列是無界的。
參考
- ReentrantLock源碼
- Java并發包--阻塞隊列(BlockingQueue)
總結
以上是生活随笔為你收集整理的PriorityBlockingQueue源码的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ArrayBlockingQueue源码
- 下一篇: DelayQueue源码