工作队列
work消息模型
工作隊(duì)列或者競(jìng)爭消費(fèi)者模式
在第一篇教程中,我們編寫了一個(gè)程序,從一個(gè)命名隊(duì)列中發(fā)送并接受消息。在這里,我們將創(chuàng)建一個(gè)工作隊(duì)列,在多個(gè)工作者之間分配耗時(shí)任務(wù)。
工作隊(duì)列,又稱任務(wù)隊(duì)列。主要思想就是避免執(zhí)行資源密集型任務(wù)時(shí),必須等待它執(zhí)行完成。相反我們稍后完成任務(wù),我們將任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。 在后臺(tái)運(yùn)行的工作進(jìn)程將獲取任務(wù)并最終執(zhí)行作業(yè)。當(dāng)你運(yùn)行許多消費(fèi)者時(shí),任務(wù)將在他們之間共享,但是一個(gè)消息只能被一個(gè)消費(fèi)者獲取。
這個(gè)概念在Web應(yīng)用程序中特別有用,因?yàn)樵诙痰腍TTP請(qǐng)求窗口中無法處理復(fù)雜的任務(wù)。
接下來我們來模擬這個(gè)流程:
P:生產(chǎn)者:任務(wù)的發(fā)布者C1:消費(fèi)者,領(lǐng)取任務(wù)并且完成任務(wù),假設(shè)完成速度較快C2:消費(fèi)者2:領(lǐng)取任務(wù)并完成任務(wù),假設(shè)完成速度慢面試題:避免消息堆積?
1)采用workqueue,多個(gè)消費(fèi)者監(jiān)聽同一隊(duì)列。
2)接收到消息以后,而是通過線程池,異步消費(fèi)。
?
生產(chǎn)者
生產(chǎn)者與案例1中的幾乎一樣:
public class Send {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊(duì)列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 循環(huán)發(fā)布任務(wù)for (int i = 0; i < 50; i++) {// 消息內(nèi)容String message = "task .. " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(i * 2);}// 關(guān)閉通道和連接channel.close();connection.close();} }不過這里我們是循環(huán)發(fā)送50條消息。
消費(fèi)者1
消費(fèi)者2
與消費(fèi)者1基本類似,就是沒有設(shè)置消費(fèi)耗時(shí)時(shí)間。
這里是模擬有些消費(fèi)者快,有些比較慢。
?
接下來,兩個(gè)消費(fèi)者一同啟動(dòng),然后發(fā)送50條消息:
可以發(fā)現(xiàn),兩個(gè)消費(fèi)者各自消費(fèi)了25條消息,而且各不相同,這就實(shí)現(xiàn)了任務(wù)的分發(fā)。
能者多勞
剛才的實(shí)現(xiàn)有問題嗎?
-  
消費(fèi)者1比消費(fèi)者2的效率要低,一次任務(wù)的耗時(shí)較長
 -  
然而兩人最終消費(fèi)的消息數(shù)量是一樣的
 -  
消費(fèi)者2大量時(shí)間處于空閑狀態(tài),消費(fèi)者1一直忙碌
 
現(xiàn)在的狀態(tài)屬于是把任務(wù)平均分配,正確的做法應(yīng)該是消費(fèi)越快的人,消費(fèi)的越多。
怎么實(shí)現(xiàn)呢?
我們可以使用basicQos方法和prefetchCount = 1設(shè)置。 這告訴RabbitMQ一次不要向工作人員發(fā)送多于一條消息。 或者換句話說,不要向工作人員發(fā)送新消息,直到它處理并確認(rèn)了前一個(gè)消息。 相反,它會(huì)將其分派給不是仍然忙碌的下一個(gè)工作人員。
再次測(cè)試:
?
總結(jié)
                            
                        - 上一篇: 消息确认机制ACK
 - 下一篇: 发布订阅之fanout