发布订阅之direct
訂閱模型-Direct
有選擇性的接收消息
在訂閱模式中,生產者發布消息,所有消費者都可以獲取所有消息。
在路由模式中,我們將添加一個功能 - 我們將只能訂閱一部分消息。 例如,我們只能將重要的錯誤消息引導到日志文件(以節省磁盤空間),同時仍然能夠在控制臺上打印所有日志消息。
但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。
在Direct模型下,隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
消息的發送方在向Exchange發送消息時,也必須指定消息的routing key。
P:生產者,向Exchange發送消息,發送消息時,會指定一個routing key。
X:Exchange(交換機),接收生產者的消息,然后把消息遞交給 與routing key完全匹配的隊列
C1:消費者,其所在隊列指定了需要routing key 為 error 的消息
C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息
?
生產者
此處我們模擬商品的增刪改,發送消息的RoutingKey分別是:insert、update、delete
public class Send {private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明exchange,指定類型為directchannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息內容String message = "商品新增了, id = 1001";// 發送消息,并且指定routing key 為:insert ,代表新增商品channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());System.out.println(" [商品服務:] Sent '" + message + "'");channel.close();connection.close();} }消費者1
我們此處假設消費者1只接收兩種類型的消息:更新商品和刪除商品。
public class Recv {private final static String QUEUE_NAME = "direct_exchange_queue_1";private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊列到交換機,同時指定需要訂閱的routing key。假設此處需要update和delete消息channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");// 定義隊列的消費者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [消費者1] received : " + msg + "!");}};// 監聽隊列,自動ACKchannel.basicConsume(QUEUE_NAME, true, consumer);} }消費者2
我們此處假設消費者2接收所有類型的消息:新增商品,更新商品和刪除商品。
public class Recv2 {private final static String QUEUE_NAME = "direct_exchange_queue_2";private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊列到交換機,同時指定需要訂閱的routing key。訂閱 insert、update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");// 定義隊列的消費者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [消費者2] received : " + msg + "!");}};// 監聽隊列,自動ACKchannel.basicConsume(QUEUE_NAME, true, consumer);} }測試
我們分別發送增、刪、改的RoutingKey,發現結果:
?
總結
以上是生活随笔為你收集整理的发布订阅之direct的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 发布订阅之fanout
- 下一篇: 发布订阅之topics