发布订阅之topics
訂閱模型-Topic
Topic類型的Exchange與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符!
Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert
通配符規則:
`#`:匹配一個或多個詞`*`:匹配不多不少恰好1個詞舉例:
`audit.#`:能夠匹配`audit.irs.corporate` 或者 `audit.irs``audit.*`:只能匹配`audit.irs`在這個例子中,我們將發送所有描述動物的消息。消息將使用由三個字(兩個點)組成的routing key發送。路由關鍵字中的第一個單詞將描述速度,第二個顏色和第三個種類:“<speed>.<color>.<species>”。
我們創建了三個綁定:Q1綁定了綁定鍵“* .orange.”,Q2綁定了“.*.rabbit”和“lazy.#”。
Q1匹配所有的橙色動物。
Q2匹配關于兔子以及懶惰動物的消息。
?
練習,生產者發送如下消息,會進入那個隊列:
quick.orange.rabbit Q1 Q2
lazy.orange.elephant
quick.orange.fox
lazy.pink.rabbit
quick.brown.fox
quick.orange.male.rabbit
orange
?
生產者
使用topic類型的Exchange,發送消息的routing key有3種: item.isnert、item.update、item.delete:
public class Send {private final static String EXCHANGE_NAME = "topic_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明exchange,指定類型為topicchannel.exchangeDeclare(EXCHANGE_NAME, "topic");// 消息內容String message = "新增商品 : id = 1001";// 發送消息,并且指定routing key 為:insert ,代表新增商品channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());System.out.println(" [商品服務:] Sent '" + message + "'");channel.close();connection.close();} }消費者1
我們此處假設消費者1只接收兩種類型的消息:更新商品和刪除商品
public class Recv {private final static String QUEUE_NAME = "topic_exchange_queue_1";private final static String EXCHANGE_NAME = "topic_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊列到交換機,同時指定需要訂閱的routing key。需要 update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");// 定義隊列的消費者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [消費者1] received : " + msg + "!");}};// 監聽隊列,自動ACKchannel.basicConsume(QUEUE_NAME, true, consumer);} }消費者2
我們此處假設消費者2接收所有類型的消息:新增商品,更新商品和刪除商品。
/*** 消費者2*/ public class Recv2 {private final static String QUEUE_NAME = "topic_exchange_queue_2";private final static String EXCHANGE_NAME = "topic_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊列到交換機,同時指定需要訂閱的routing key。訂閱 insert、update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");// 定義隊列的消費者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [消費者2] received : " + msg + "!");}};// 監聽隊列,自動ACKchannel.basicConsume(QUEUE_NAME, true, consumer);} }?
總結
以上是生活随笔為你收集整理的发布订阅之topics的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: 发布订阅之direct
 - 下一篇: 持久化