Redis之时间轮机制(五)
🚀 優質資源分享 🚀
| 🧡 Python實戰微信訂餐小程序 🧡 | 進階級 | 本課程是python flask+微信小程序的完美結合,從項目搭建到騰訊云部署上線,打造一個全棧訂餐系統。 |
| 💛Python量化交易實戰💛 | 入門級 | 手把手帶你打造一個易擴展、更安全、效率更高的量化交易系統 |
一、什么是時間輪
時間輪這個技術其實出來很久了,在kafka、zookeeper等技術中都有時間輪使用的方式。 時間輪是一種高效利用線程資源進行批量化調度的一種調度模型。把大批量的調度任務全部綁定到同一個調度器上,使用這一個調度器來進行所有任務的管理、觸發、以及運行。所以時間輪的模型能夠高效管理各種延時任務、周期任務、通知任務。 以后大家在工作中遇到類似的功能,可以采用時間輪機制。如下圖所示,時間輪,從圖片上來看,就和手表的表圈是一樣,所以稱為時間輪,是因為它是以時間作為刻度組成的一個環形隊列,這個環形隊列采用數組來實現,數組的每個元素稱為槽,每個槽可以放一個定時任務列表,叫HashedWheelBucket,它是一個雙向鏈表,量表的每一項表示一個定時任務項(HashedWhellTimeout),其中封裝了真正的定時任務TimerTask。時間輪是由多個時間格組成,下圖中有8個時間格,每個時間格代表當前時間輪的基本時間跨度(tickDuration),其中時間輪的時間格的個數是固定的。在下圖中,有8個時間格(槽),假設每個時間格的單位為1s,那么整個時間輪走完一圈需要8s鐘。每秒鐘指針會沿著順時針方向移動一個,這個單位可以設置,比如以秒為單位,可以以一小時為單位,這個單位可以代表時間精度。通過指針移動,來獲得每個時間格中的任務列表,然后遍歷這一個時間格中的雙向鏈表來執行任務,以此循環。
二、時間輪案例使用
這里使用的時間輪是Netty這個包中提供的,使用方法比較簡單。
- 先構建一個HashedWheelTimer時間輪。
- tickDuration: 100 ,表示每個時間格代表當前時間輪的基本時間跨度,這里是100ms,也就是指針100ms跳動一次,每次跳動一個窗格
- ticksPerWheel:1024,表示時間輪上一共有多少個窗格,分配的窗格越多,占用內存空間就越大
- leakDetection:是否開啟內存泄漏檢測。
- maxPendingTimeouts[可選參數],最大允許等待的任務數,默認沒有限制。
- 通過newTimeout()把需要延遲執行的任務添加到時間輪中
三、時間輪的原理解析
時間輪的整體原理,分為幾個部分。
創建時間輪
時間輪本質上是一個環狀數組,比如我們初始化時間輪時:ticksPerWheel=8,那么意味著這個環狀數組的長度是8,如圖3-12所示。
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];添加任務
- 當通過newTimeout()方法添加一個延遲任務時,該任務首先會加入到一個阻塞隊列中中。然后會有一個定時任務從該隊列獲取任務,添加到時間輪的指定位置,計算方法如下。
任務執行
- Worker線程按照每次間隔時間轉動后,得到該時間窗格中的任務鏈表,然后從鏈表的head開始逐個取出任務,有兩個判斷條件
- 當前任務需要轉動的圈數為0,表示任務是當前圈開始執行
- 當前任務達到了delay時間,也就是timeout.deadline <= deadline
- 最終調用timeout.expire()方法執行任務。
四、時間輪的源碼分析
HashedWheelTimer的構造
- 調用 createWheel 創建一個時間輪,時間輪數組一定是 2 的冪次方,比如傳入的 ticksPerWheel=6,那么初始化的 wheel 長度一定是 8,這樣是便于時間格的計算。
- tickDuration,表示時間輪的跨度,代表每個時間格的時間精度,以納秒的方式來表現。
- 把工作線程 Worker 封裝成 WorkerThread,從名字可以知道,它就是最終那個負責干活的線程。
- 對傳入的 ticksPerWheel 進行整形
- 初始化固定長度的 HashedWheelBucket
添加任務到時間輪
調用 newTimeout 方法,把任務添加進來。
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {if (task == null) {throw new NullPointerException("task");}if (unit == null) {throw new NullPointerException("unit");}//統計任務個數long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();//判斷最大任務數量是否超過限制if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts ("+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "+ "timeouts (" + maxPendingTimeouts + ")");}//如果時間輪沒有啟動,則通過start方法進行啟動start();// Add the timeout to the timeout queue which will be processed on the next tick.// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.//計算任務的延遲時間,通過當前的時間+當前任務執行的延遲時間-時間輪啟動的時間。long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;//在delay為正數的情況下,deadline是不可能為負數//如果為負數,那么說明超過了long的最大值if (delay > 0 && deadline < 0) {deadline = Long.MAX\_VALUE;}//創建一個Timeout任務,理論上來說,這個任務應該要加入到時間輪的時間格子中,但是這里并不是先添加到時間格,而是先//加入到一個阻塞隊列,然后等到時間輪執行到下一個格子時,再從隊列中取出最多100000個任務添加到指定的時間格(槽)中。HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout; }start
任務添加到阻塞隊列之后,我們再來看啟動方法。
start 方法會根據當前的 workerState 狀態來啟動時間輪。并且用了 startTimeInitialized 來控制線程的運行,如果 workerThread 沒有啟動起來,那么 newTimeout 方法會一直阻塞在運行 start 方法中。如果不阻塞,newTimeout 方法會獲取不到 startTime。
public void start() {//workerState一開始的時候是0(WORKER\_STATE\_INIT),然后才會設置為1(WORKER\_STATE\_STARTED)switch (WORKER\_STATE\_UPDATER.get(this)) {case WORKER\_STATE\_INIT:if (WORKER\_STATE\_UPDATER.compareAndSet(this, WORKER\_STATE\_INIT, WORKER\_STATE\_STARTED)) {workerThread.start();}break;case WORKER\_STATE\_STARTED:break;case WORKER\_STATE\_SHUTDOWN:throw new IllegalStateException("cannot be started once stopped");default:throw new Error("Invalid WorkerState");}// 等待worker線程初始化時間輪的啟動時間while (startTime == 0) {try {//這里使用countDownLauch來確保調度的線程已經被啟動startTimeInitialized.await();} catch (InterruptedException ignore) {// Ignore - it will be ready very soon.}} }啟動時間輪
調用 start()方法, 會調用?workerThread.start();?來啟動一個工作線程,這個工作線程是在構造方法中初始化的,包裝的是一個 Worker 內部線程類。
所以直接進入到 Worker 這個類的 run 方法,了解下它的設計邏輯:
public void run() {// 初始化startTime,表示時間輪的啟動時間startTime = System.nanoTime();if (startTime == 0) {// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.startTime = 1;} // 喚醒被阻塞的start()方法。startTimeInitialized.countDown();do {//返回每tick一次的時間間隔final long deadline = waitForNextTick();if (deadline > 0) {//計算時間輪的槽位int idx = (int) (tick & mask);//移除掉CancelledTaskprocessCancelledTasks();//得到當前指針位置的時間槽HashedWheelBucket bucket =wheel[idx];//將newTimeout()方法中加入到待處理定時任務隊列中的任務加入到指定的格子中transferTimeoutsToBuckets();//運行目前指針指向的槽中的bucket鏈表中的任務bucket.expireTimeouts(deadline);tick++;}} while (WORKER\_STATE\_UPDATER.get(HashedWheelTimer.this) == WORKER\_STATE\_STARTED);//如果Worker\_State一只是started狀態,就一直循環// Fill the unprocessedTimeouts so we can return them from stop() method.for (HashedWheelBucket bucket : wheel) {bucket.clearTimeouts(unprocessedTimeouts); //清除時間輪中不需要處理的任務}for (; ; ) {//遍歷任務隊列,發現如果有任務被取消,則添加到unprocessedTimeouts,也就是不需要處理的隊列中。HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);}}//處理被取消的任務.processCancelledTasks(); }時間輪指針跳動
這個方法的主要作用就是返回下一個指針指向的時間間隔,然后進行 sleep 操作。
大家可以想象一下,一個鐘表上秒與秒之間是有時間間隔的,那么 waitForNextTick 就是根據當前時間計算出跳動到下個時間的時間間隔,然后進行 sleep,然后再返回當前時間距離時間輪啟動時間的時間間隔。
說得再直白一點:,假設當前的 tickDuration 的間隔是 1s,tick 默認 = 0, 此時第一次進來,得到的 deadline=1,也就是下一次跳動的時間間隔是 1s。假設當前處于:
private long waitForNextTick() {//tick表示總的tick數//tickDuration表示每個時間格的跨度,所以deadline返回的是下一次時間輪指針跳動的時間long deadline = tickDuration * (tick + 1);for (; ; ) {//計算當前時間距離啟動時間的時間間隔final long currentTime = System.nanoTime() - startTime;//通過下一次指針跳動的延遲時間距離當前時間的差額,這個作為sleep時間使用。// 其實線程是以睡眠一定的時候再來執行下一個ticket的任務的long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;//sleepTimeMs小于零表示走到了下一個時間槽位置if (sleepTimeMs <= 0) {if (currentTime == Long.MIN\_VALUE) {return -Long.MAX\_VALUE;} else {return currentTime;}}if (isWindows()) {sleepTimeMs = sleepTimeMs / 10 * 10;}//進入到這里進行sleep,表示當前時間距離下一次tick時間還有一段距離,需要sleep。try {Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {if (WORKER\_STATE\_UPDATER.get(HashedWheelTimer.this) == WORKER\_STATE\_SHUTDOWN) {return Long.MIN\_VALUE;}}} }transferTimeoutsToBuckets
轉移任務到時間輪中,前面我們講過,任務添加進來時,是先放入到阻塞隊列。
而在現在這個方法中,就是把阻塞隊列中的數據轉移到時間輪的指定位置。
在這個轉移方法中,寫死了一個循環,每次都只轉移 10 萬個任務。
然后根據 HashedWheelTimeout 的 deadline 延遲時間計算出時間輪需要運行多少次才能運行當前的任務,如果當前的任務延遲時間大于時間輪跑一圈所需要的時間,那么就計算需要跑幾圈才能到這個任務運行。
最后計算出該任務在時間輪中的槽位,添加到時間輪的鏈表中。
private void transferTimeoutsToBuckets() {// 循環100000次,也就是每次轉移10w個任務for (int i = 0; i < 100000; i++) {//從阻塞隊列中獲得具體的任務HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {// all processedbreak;}if (timeout.state() == HashedWheelTimeout.ST\_CANCELLED) {// Was cancelled in the meantime.continue;}//計算tick次數,deadline表示當前任務的延遲時間,tickDuration表示時間槽的間隔,兩者相除就可以計算當前任務需要tick幾次才能被執行long calculated = timeout.deadline / tickDuration;// 計算剩余的輪數, 只有 timer 走夠輪數, 并且到達了 task 所在的 slot, task 才會過期.(被執行)timeout.remainingRounds = (calculated - tick) / wheel.length;//如果任務在timeouts隊列里面放久了, 以至于已經過了執行時間, 這個時候就使用當前tick, 也就是放到當前bucket, 此方法調用完后就會被執行final long ticks = Math.max(calculated, tick);// 算出任務應該插入的 wheel 的 slot, stopIndex = tick 次數 & mask, mask = wheel.length - 1int stopIndex = (int) (ticks & mask);//把timeout任務插入到指定的bucket鏈中。HashedWheelBucket bucket = wheel[stopIndex];bucket.addTimeout(timeout);} }運行時間輪中的任務
當指針跳動到某一個時間槽中時,會就觸發這個槽中的任務的執行。該功能是通過 expireTimeouts 來實現
這個方法的主要作用是: 過期并執行格子中到期的任務。也就是當 tick 進入到指定格子時,worker 線程會調用這個方法
HashedWheelBucket 是一個鏈表,所以我們需要從 head 節點往下進行遍歷。如果鏈表沒有遍歷到鏈表尾部那么就繼續往下遍歷。
獲取的 timeout 節點節點,如果剩余輪數 remainingRounds 大于 0,那么就說明要到下一圈才能運行,所以將剩余輪數減一;
如果當前剩余輪數小于等于零了,那么就將當前節點從 bucket 鏈表中移除,并判斷一下當前的時間是否大于 timeout 的延遲時間,如果是則調用 timeout 的 expire 執行任務。
void expireTimeouts(long deadline) {HashedWheelTimeout timeout = head;// 遍歷當前時間槽中的所有任務while (timeout != null) {HashedWheelTimeout next = timeout.next;//如果當前任務要被執行,那么remainingRounds應該小于或者等于0if (timeout.remainingRounds <= 0) {//從bucket鏈表中移除當前timeout,并返回鏈表中下一個timeoutnext = remove(timeout);//如果timeout的時間小于當前的時間,那么就調用expire執行taskif (timeout.deadline <= deadline) {timeout.expire();} else {//不可能發生的情況,就是說round已經為0了,deadline卻>當前槽的deadline// The timeout was placed into a wrong slot. This should never happen.throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}} else if (timeout.isCancelled()) {next = remove(timeout);} else {//因為當前的槽位已經過了,說明已經走了一圈了,把輪數減一timeout.remainingRounds--;}//把指針放置到下一個timeouttimeout = next;} }總結
以上是生活随笔為你收集整理的Redis之时间轮机制(五)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Yolov5训练自己的数据集+Tenso
- 下一篇: java一般自学多久