发布订阅之fanout
訂閱模型分類
在之前的模式中,我們創(chuàng)建了一個(gè)工作隊(duì)列。 工作隊(duì)列背后的假設(shè)是:每個(gè)任務(wù)只被傳遞給一個(gè)工作人員。 在這一部分,我們將做一些完全不同的事情 - 我們將會(huì)傳遞一個(gè)信息給多個(gè)消費(fèi)者。 這種模式被稱為“發(fā)布/訂閱”。
訂閱模型示意圖:
解讀:
1、1個(gè)生產(chǎn)者,多個(gè)消費(fèi)者
2、每一個(gè)消費(fèi)者都有自己的一個(gè)隊(duì)列
3、生產(chǎn)者沒有將消息直接發(fā)送到隊(duì)列,而是發(fā)送到了交換機(jī)
4、每個(gè)隊(duì)列都要綁定到交換機(jī)
5、生產(chǎn)者發(fā)送的消息,經(jīng)過交換機(jī)到達(dá)隊(duì)列,實(shí)現(xiàn)一個(gè)消息被多個(gè)消費(fèi)者獲取的目的
X(Exchanges):交換機(jī)一方面:接收生產(chǎn)者發(fā)送的消息。另一方面:知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。
Exchange類型有以下幾種:
Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列Direct:定向,把消息交給符合指定routing key 的隊(duì)列 Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊(duì)列我們這里先學(xué)習(xí)
Fanout:即廣播模式Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒有任何隊(duì)列與Exchange綁定,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!
訂閱模型-Fanout
Fanout,也稱為廣播。
流程圖:
在廣播模式下,消息發(fā)送流程是這樣的:
-  
1) 可以有多個(gè)消費(fèi)者
 -  
2) 每個(gè)消費(fèi)者有自己的queue(隊(duì)列)
 -  
3) 每個(gè)隊(duì)列都要綁定到Exchange(交換機(jī))
 -  
4) 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī),交換機(jī)來決定要發(fā)給哪個(gè)隊(duì)列,生產(chǎn)者無法決定。
 -  
5) 交換機(jī)把消息發(fā)送給綁定過的所有隊(duì)列
 -  
6) 隊(duì)列的消費(fèi)者都能拿到消息。實(shí)現(xiàn)一條消息被多個(gè)消費(fèi)者消費(fèi)
 
生產(chǎn)者
兩個(gè)變化:
-  
1) 聲明Exchange,不再聲明Queue
 -  
2) 發(fā)送消息到Exchange,不再發(fā)送到Queue
 
消費(fèi)者1
public class Recv {private final static String QUEUE_NAME = "fanout_exchange_queue_1";private final static String EXCHANGE_NAME = "fanout_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊(duì)列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊(duì)列到交換機(jī)channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 定義隊(duì)列的消費(fèi)者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [消費(fèi)者1] received : " + msg + "!");}};// 監(jiān)聽隊(duì)列,自動(dòng)返回完成channel.basicConsume(QUEUE_NAME, true, consumer);} }要注意代碼中:隊(duì)列需要和交換機(jī)綁定
消費(fèi)者2
public class Recv2 {private final static String QUEUE_NAME = "fanout_exchange_queue_2";private final static String EXCHANGE_NAME = "fanout_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊(duì)列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊(duì)列到交換機(jī)channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 定義隊(duì)列的消費(fèi)者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [消費(fèi)者2] received : " + msg + "!");}};// 監(jiān)聽隊(duì)列,手動(dòng)返回完成channel.basicConsume(QUEUE_NAME, true, consumer);} }測(cè)試
我們運(yùn)行兩個(gè)消費(fèi)者,然后發(fā)送1條消息:
?
總結(jié)
以上是生活随笔為你收集整理的发布订阅之fanout的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: 工作队列
 - 下一篇: 发布订阅之direct