别再用 Redis List 实现消息队列了,Stream 专为队列而生
使用 Redis 的 List 實(shí)現(xiàn)消息隊(duì)列有很多局限性,比如:
沒有良好的 ACK 機(jī)制;
沒有 ConsumerGroup 消費(fèi)組概念;
消息堆積。
List 是線性結(jié)構(gòu),想要查詢指定數(shù)據(jù)需要遍歷整個(gè)列表;
Stream 是 Redis 5.0 引入的一種專門為消息隊(duì)列設(shè)計(jì)的數(shù)據(jù)類型,Stream 是一個(gè)包含 0 個(gè)或者多個(gè)元素的有序隊(duì)列,這些元素根據(jù) ID 的大小進(jìn)行有序排列。
它實(shí)現(xiàn)了大部分消息隊(duì)列的功能:
消息 ID 系列化生成;
消息遍歷;
消息的阻塞和非阻塞讀;
Consumer Groups 消費(fèi)組;
ACK 確認(rèn)機(jī)制。
支持多播。
提供了很多消息隊(duì)列操作命令,并且借鑒 Kafka 的 Consumer Groups 的概念,提供了消費(fèi)組功能。
同時(shí)提供了消息的持久化和主從復(fù)制機(jī)制,客戶端可以訪問任何時(shí)刻的數(shù)據(jù),并且能記住每一個(gè)客戶端的訪問位置,從而保證消息不丟失。
廢話少說,先來看下如何使用,官網(wǎng)文檔詳見:https://redis.io/topics/streams-intro
?
XADD:插入消息
「云嵐宗眾弟子聽命,擊殺蕭炎!」
當(dāng)云山最后一字落下,那彌漫的緊繃氣氛,頓時(shí)宣告破碎,懸浮半空的眾多云嵐宗長老背后雙翼一振,便是咻咻的劃過天際,追殺蕭炎。
云山使用以下指令向隊(duì)列中插入「追殺蕭炎」命令,讓長老帶領(lǐng)子弟去執(zhí)行。
XADD 云嵐宗?* task kill name 蕭炎 "1645936602161-0"Stream 中的每個(gè)元素由鍵值對(duì)的形式組成,不同元素可以包含不同數(shù)量的鍵值對(duì)。
該命令的語法如下:
XADD?streamName?id?field?value?[field?value?...]消息隊(duì)列名稱后面的 「*」 ,表示讓 Redis 為插入的消息自動(dòng)生成唯一 ID,當(dāng)然也可以自己定義。
消息 ID 由兩部分組成:
當(dāng)前毫秒內(nèi)的時(shí)間戳;
順序編號(hào)。從 0 為起始值,用于區(qū)分同一時(shí)間內(nèi)產(chǎn)生的多個(gè)命令。
?通過將元素 ID 與時(shí)間進(jìn)行關(guān)聯(lián),并強(qiáng)制要求新元素的 ID 必須大于舊元素的 ID, Redis 從邏輯上將流變成了一種只執(zhí)行追加操作(append only)的數(shù)據(jù)結(jié)構(gòu)。
這種特性對(duì)于使用流實(shí)現(xiàn)消息隊(duì)列和事件系統(tǒng)的用戶來說是非常重要的:
用戶可以確信,新的消息和事件只會(huì)出現(xiàn)在已有消息和事件之后,就像現(xiàn)實(shí)世界里新事件總是發(fā)生在已有事件之后一樣,一切都是有序進(jìn)行的。
?
XREAD:讀取消息
云凌老狗使用如下指令接收云山的命令:
XREAD?COUNT?1?BLOCK?0?STREAMS?云嵐宗?0-0 1)?1)?"\xe4\xba\x91\xe5\xb2\x9a\xe5\xae\x97"2)?1)?1)?"1645936602161-0"2)?1)?"task"2)?"kill"3)?"name"4)?"蕭炎"?#?蕭炎XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
該指令可以同時(shí)對(duì)多個(gè)流進(jìn)行讀取,每個(gè)心法對(duì)應(yīng)含義如下:
COUNT:表示每個(gè)流中最多讀取的元素個(gè)數(shù);
BLOCK:阻塞讀取,當(dāng)消息隊(duì)列沒有消息的時(shí)候,則阻塞等待, 0 表示無限等待,單位是毫秒。
ID:消息 ID,在讀取消息的時(shí)候可以指定 ID,并從這個(gè) ID 的下一條消息開始讀取,0-0 則表示從第一個(gè)元素開始讀取。
如果想使用 XREAD 進(jìn)行順序消費(fèi),每次讀取后要記住返回的消息 ID,下次調(diào)用 XREAD 就將上一次返回的消息 ID 作為參數(shù)傳遞到下一次調(diào)用就可以繼續(xù)消費(fèi)后續(xù)的消息了。
?云韻宗主,我今天剛到云嵐宗,歷史的消息就不接了,只想接收我使用 XREAD 阻塞等待的那一刻開始通過 XADD 發(fā)布的消息要咋整?
運(yùn)行「」心法即可,心法的最后「」符號(hào)表示讀取最新的阻塞消息,讀取不到則一直死等。
等待過程中,其他長老向隊(duì)列追加消息,則會(huì)立即讀取到。
XREAD?COUNT?1?BLOCK?0?STREAMS?云嵐宗?$?這么容易就實(shí)現(xiàn)消息隊(duì)列了么?說好的 ACK 機(jī)制呢?
這里只是開胃菜,通過 XREAD 讀取的數(shù)據(jù)其實(shí)并沒有被刪除,當(dāng)重新執(zhí)行 XREAD COUNT 2 BLOCK 0 STREAMS 云嵐宗 0-0 指令的時(shí)候又會(huì)重新讀取到。
所以我們還需要 ACK 機(jī)制,
接下來,我們來一個(gè)真正的消息隊(duì)列。
?
ConsumerGroup
Redis Stream 的 ConsumerGroup(消費(fèi)者組)允許用戶將一個(gè)流從邏輯上劃分為多個(gè)不同的流,并讓 ConsumerGroup 的消費(fèi)者去處理。
它是一個(gè)強(qiáng)大的支持多播的可持久化的消息隊(duì)列。Redis Stream 借鑒了 Kafka 的設(shè)計(jì)。
Stream 的高可用是建立主從復(fù)制基礎(chǔ)上的,它和其它數(shù)據(jù)結(jié)構(gòu)的復(fù)制機(jī)制沒有區(qū)別,也就是說在 Sentinel 和 Cluster 集群環(huán)境下 Stream 是可以支持高可用的。
Redis-StreamRedis Stream 的結(jié)構(gòu)如上圖所示。有一個(gè)消息鏈表,每個(gè)消息都有一個(gè)唯一的 ID 和對(duì)應(yīng)的內(nèi)容;
消息持久化;
每個(gè)消費(fèi)組的狀態(tài)是獨(dú)立的,不不影響,同一份的 Stream 消息會(huì)被所有的消費(fèi)組消費(fèi);
一個(gè)消費(fèi)組可以由多個(gè)消費(fèi)者組成,消費(fèi)者之間是競(jìng)爭(zhēng)關(guān)系,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使 last_deliverd_id 往前移動(dòng);
每個(gè)消費(fèi)者有一個(gè) pending_ids 變量,用于記錄當(dāng)前消費(fèi)者讀取了但是還沒 ack 的消息。它用來保證消息至少被客戶端消費(fèi)了一次。
消費(fèi)組實(shí)現(xiàn)的消息隊(duì)列主要涉及以下三個(gè)指令:
XGROUP用于創(chuàng)建、銷毀和管理消費(fèi)者組。
XREADGROUP通過消費(fèi)組從流中讀取數(shù)據(jù)。
XACK是允許消費(fèi)者將待處理消息標(biāo)記為已正確處理的命令。
創(chuàng)建消費(fèi)組
Stream 通過 XGROUP CREATE 指令創(chuàng)建消費(fèi)組 (Consumer Group),需要傳遞起始消息 ID 參數(shù)用來初始化 last_delivered_id 變量。
我們使用 XADD 往 bossStream 隊(duì)列插入一些消息:
XADD?bossStream?*?name?zhangsan?age?26 XADD?bossStream?*?name?lisi?age?2 XADD?bossStream?*?name?bigold?age?40如下指令,為消息隊(duì)列名為 bossStream 創(chuàng)建「青龍門」和「六扇門」兩個(gè)消費(fèi)組。
#?語法如下 #?XGROUP?CREATE?stream?group?start_id XGROUP?CREATE?bossStream?青龍門?0-0?MKSTREAM XGROUP?CREATE?bossStream?六扇門?0-0?MKSTREAMstream:指定隊(duì)列的名字;
group:指定消費(fèi)組名字;
start_id:指定消費(fèi)組在 Stream 中的起始 ID,它決定了消費(fèi)者組從哪個(gè) ID 之后開始讀取消息,0-0 從第一條開始讀取, $ 表示從最后一條向后開始讀取,只接收新消息。
MKSTREAM:默認(rèn)情況下,XGROUP CREATE命令在目標(biāo)流不存在時(shí)返回錯(cuò)誤。可以使用可選MKSTREAM子命令作為 之后的最后一個(gè)參數(shù)來自動(dòng)創(chuàng)建流。
讀取消息
讓「青龍門」消費(fèi)組的 consumer1 從bossStream 阻塞讀取一條消息:
XREADGROUP?GROUP?青龍門?consumer1?COUNT?1?BLOCK?0?STREAMS?bossStream?> 1)?1)?"bossStream"2)?1)?1)?"1645957821396-0"2)?1)?"name"2)?"zhangsan"3)?"age"4)?"26"語法如下:
XREADGROUP?GROUP?groupName?consumerName?[COUNT?n]?[BLOCK?ms]?STREAMS?streamName?[stream?...]?id?[id?...][] 內(nèi)的表示可選參數(shù),該命令與 XREAD 大同小異,區(qū)別在于新增 GROUP groupName consumerName 選項(xiàng)。
該選項(xiàng)的兩個(gè)參數(shù)分別用于指定被讀取的消費(fèi)者組以及負(fù)責(zé)處理消息的消費(fèi)者。
其中:
>:命令的最后參數(shù) >,表示從尚未被消費(fèi)的消息開始讀取;
BLOCK:阻塞讀取;
敲黑板了
如果消息隊(duì)列中的消息被消費(fèi)組的一個(gè)消費(fèi)者消費(fèi)了,這條消息就不會(huì)再被這個(gè)消費(fèi)組的其他消費(fèi)者讀取到。
比如 consumer2 執(zhí)行讀取操作:
XREADGROUP?GROUP?青龍門?consumer2?COUNT?1?BLOCK?0?STREAMS?bossStream?> 1)?1)?"bossStream"2)?1)?1)?"1645957838700-0"2)?1)?"name"2)?"lisi"3)?"age"4)?"2"consumer2 不能再讀取到 zhangsan 了,而是讀取下一條 lisi 因?yàn)檫@條消息已經(jīng)被 consumer1 讀取了。
使用消費(fèi)者的另一個(gè)目的可以讓組內(nèi)的多個(gè)消費(fèi)者分擔(dān)讀取消息,也就是每個(gè)消費(fèi)者讀取部分消息,從而實(shí)現(xiàn)均衡負(fù)載。
比如一個(gè)消費(fèi)組有三個(gè)消費(fèi)者 C1、C2、C3 和一個(gè)包含消息 1、2、3、4、5、6、7 的流:
XPENDING 查看已讀未確認(rèn)消息
為了保證消費(fèi)者在消費(fèi)的時(shí)候發(fā)生故障或者宕機(jī)重啟后依然可以讀取消息,Stream 內(nèi)部有一個(gè)隊(duì)列(pending List)保存每個(gè)消費(fèi)者讀取但是還沒有執(zhí)行 ACK 的消息。
如果消費(fèi)者使用了 XREADGROUP GROUP groupName consumerName 讀取消息,但是沒有給 Stream 發(fā)送 XACK 命令,消息依然保留。
比如查看 bossStream 中的 消費(fèi)組「青龍門」中各個(gè)消費(fèi)者已讀取未確認(rèn)的消息信息:
XPENDING?bossStream?青龍門 1)?(integer)?2 2)?"1645957821396-0" 3)?"1645957838700-0" 4)?1)?1)?"consumer1"2)?"1"2)?1)?"consumer2"2)?"1"1)未確認(rèn)消息條數(shù);
2) ~ 3)青龍門中所有消費(fèi)者讀取的消息最小和最大 ID;
查看 consumer1讀取了哪些數(shù)據(jù),使用以下命令:
XPENDING?bossStream?青龍門?-?+?10?consumer1 1)?1)?"1645957821396-0"2)?"consumer1"3)?(integer)?37583844)?(integer)?1ACK 確認(rèn)
所以當(dāng)接收到消息并且消費(fèi)成功以后,我們需要手動(dòng) ACK 通知 Streams,這條消息就會(huì)被刪除了。命令如下:
XACK?bossStream?青龍門?1645957821396-0?1645957838700-0 (integer)?2語法如下:
XACK key group-key ID [ID ...]
消費(fèi)確認(rèn)增加了消息的可靠性,一般在業(yè)務(wù)處理完成之后,需要執(zhí)行 ack 確認(rèn)消息已經(jīng)被消費(fèi)完成,整個(gè)流程的執(zhí)行如下圖所示:
Stream 整體流程?
使用 Redisson 實(shí)戰(zhàn)
使用 maven 添加依賴
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.16.7</version> </dependency>添加 Redis 配置,碼哥的 Redis 沒有配置密碼,大家根據(jù)實(shí)際情況配置即可。
spring:application:name:?redissionredis:host:?127.0.0.1port:?6379ssl:?false@Slf4j @Service public?class?QueueService?{@Autowiredprivate?RedissonClient?redissonClient;/***?發(fā)送消息到隊(duì)列**?@param?message*/public?void?sendMessage(String?message)?{RStream<String,?String>?stream?=?redissonClient.getStream("sensor#4921");stream.add("speed",?"19");stream.add("velocity",?"39%");stream.add("temperature",?"10C");}/***?消費(fèi)者消費(fèi)消息**?@param?message*/public?void?consumerMessage(String?message)?{RStream<String,?String>?stream?=?redissonClient.getStream("sensor#4921");stream.createGroup("sensors_data",?StreamMessageId.ALL);Map<StreamMessageId,?Map<String,?String>>?messages?=?stream.readGroup("sensors_data",?"consumer_1");for?(Map.Entry<StreamMessageId,?Map<String,?String>>?entry?:?messages.entrySet())?{Map<String,?String>?msg?=?entry.getValue();System.out.println(msg);stream.ack("sensors_data",?entry.getKey());}}}參考鏈接:
https://blog.51cto.com/u_15239532/2835962
https://redis.io/topics/streams-intro
https://redisson.org/articles/redis-streams-for-java.html
有道無術(shù),術(shù)可成;有術(shù)無道,止于術(shù)
歡迎大家關(guān)注Java之道公眾號(hào)
好文章,我在看??
總結(jié)
以上是生活随笔為你收集整理的别再用 Redis List 实现消息队列了,Stream 专为队列而生的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 复旦博士用130行代码搞定核酸统计,2分
- 下一篇: python集合常用方法_Python中