RabbitMQ中7种消息队列和保姆级代码演示!
blog.csdn.net/qq_32828253/article/details/110450249
七種模式介紹與應用場景
簡單模式(Hello World)
做最簡單的事情,一個生產者對應一個消費者,RabbitMQ相當于一個消息代理,負責將A的消息轉發給B
應用場景: 將發送的電子郵件放到消息隊列,然后郵件服務在隊列中獲取郵件并發送給收件人
工作隊列模式(Work queues)
在多個消費者之間分配任務(競爭的消費者模式),一個生產者對應多個消費者,一般適用于執行資源密集型任務,單個消費者處理不過來,需要多個消費者進行處理
應用場景: 一個訂單的處理需要10s,有多個訂單可以同時放到消息隊列,然后讓多個消費者同時處理,這樣就是并行了,而不是單個消費者的串行情況
訂閱模式(Publish/Subscribe)
一次向許多消費者發送消息,一個生產者發送的消息會被多個消費者獲取,也就是將消息將廣播到所有的消費者中。
應用場景: 更新商品庫存后需要通知多個緩存和多個數據庫,這里的結構應該是:
一個fanout類型交換機扇出兩個個消息隊列,分別為緩存消息隊列、數據庫消息隊列
一個緩存消息隊列對應著多個緩存消費者
一個數據庫消息隊列對應著多個數據庫消費者
路由模式(Routing)
有選擇地(Routing key)接收消息,發送消息到交換機并且要指定路由key ,消費者將隊列綁定到交換機時需要指定路由key,僅消費指定路由key的消息
應用場景: 如在商品庫存中增加了1臺iphone12,iphone12促銷活動消費者指定routing key為iphone12,只有此促銷活動會接收到消息,其它促銷活動不關心也不會消費此routing key的消息
主題模式(Topics)
根據主題(Topics)來接收消息,將路由key和某模式進行匹配,此時隊列需要綁定在一個模式上,#匹配一個詞或多個詞,*只匹配一個詞。
應用場景: 同上,iphone促銷活動可以接收主題為iphone的消息,如iphone12、iphone13等
遠程過程調用(RPC)
如果我們需要在遠程計算機上運行功能并等待結果就可以使用RPC,具體流程可以看圖。應用場景:需要等待接口返回數據,如訂單支付
發布者確認(Publisher Confirms)
與發布者進行可靠的發布確認,發布者確認是RabbitMQ擴展,可以實現可靠的發布。在通道上啟用發布者確認后,RabbitMQ將異步確認發送者發布的消息,這意味著它們已在服務器端處理。
應用場景: 對于消息可靠性要求較高,比如錢包扣款
代碼演示
代碼中沒有對后面兩種模式演示,有興趣可以自己研究
簡單模式
import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Sender?{private?final?static?String?QUEUE_NAME?=?"simple_queue";public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();//?聲明隊列// queue:隊列名// durable:是否持久化// exclusive:是否排外??即只允許該channel訪問該隊列???一般等于true的話用于一個隊列只能有一個消費者來消費的場景// autoDelete:是否自動刪除??消費完刪除// arguments:其他屬性channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//消息內容String?message?=?"simplest?mode?message";channel.basicPublish("",?QUEUE_NAME,?null,?message.getBytes());System.out.println("[x]Sent?'"?+?message?+?"'");//最后關閉通關和連接channel.close();connection.close();} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver?{private?final?static?String?QUEUE_NAME?=?"simplest_queue";public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?TimeoutException?{//?獲取連接ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});} }工作隊列模式
import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver1?{private?final?static?String?QUEUE_NAME?=?"queue_work";public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//?同一時刻服務器只會發送一條消息給消費者channel.basicQos(1);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver2?{private?final?static?String?QUEUE_NAME?=?"queue_work";public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//?同一時刻服務器只會發送一條消息給消費者channel.basicQos(1);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Sender?{private?final?static?String?QUEUE_NAME?=?"queue_work";public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();//?聲明隊列channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);for?(int?i?=?0;?i?<?100;?i++)?{String?message?=?"work?mode?message"?+?i;channel.basicPublish("",?QUEUE_NAME,?null,?message.getBytes());System.out.println("[x]?Sent?'"?+?message?+?"'");Thread.sleep(i?*?10);}channel.close();connection.close();} }發布訂閱模式
import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;public?class?Receive1?{private?static?final?String?EXCHANGE_NAME?=?"logs";public?static?void?main(String[]?argv)?throws?Exception?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");String?queueName?=?channel.queueDeclare().getQueue();channel.queueBind(queueName,?EXCHANGE_NAME,?"");System.out.println("?[*]?Waiting?for?messages.?To?exit?press?CTRL+C");//?訂閱消息的回調函數DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+?message?+?"'");};//?消費者,有消息時出發訂閱回調函數channel.basicConsume(queueName,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;public?class?Receive2?{private?static?final?String?EXCHANGE_NAME?=?"logs";public?static?void?main(String[]?argv)?throws?Exception?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");String?queueName?=?channel.queueDeclare().getQueue();channel.queueBind(queueName,?EXCHANGE_NAME,?"");System.out.println("?[*]?Waiting?for?messages.?To?exit?press?CTRL+C");//?訂閱消息的回調函數DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received2?'"?+?message?+?"'");};//?消費者,有消息時出發訂閱回調函數channel.basicConsume(queueName,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory;public?class?Sender?{private?static?final?String?EXCHANGE_NAME?=?"logs";public?static?void?main(String[]?argv)?throws?Exception?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");String?message?=?"publish?subscribe?message";channel.basicPublish(EXCHANGE_NAME,?"",?null,?message.getBytes("UTF-8"));System.out.println("?[x]?Sent?'"?+?message?+?"'");channel.close();connection.close();} }路由模式
import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver1?{private?final?static?String?QUEUE_NAME?=?"queue_routing";private?final?static?String?EXCHANGE_NAME?=?"exchange_direct";public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//?指定路由的key,接收key和key2channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"key");channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"key2");channel.basicQos(1);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});}} import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver2?{private?final?static?String?QUEUE_NAME?=?"queue_routing2";private?final?static?String?EXCHANGE_NAME?=?"exchange_direct";public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//?僅接收key2channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"key2");channel.basicQos(1);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Sender?{private?final?static?String?EXCHANGE_NAME?=?"exchange_direct";private?final?static?String?EXCHANGE_TYPE?=?"direct";public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();//?交換機聲明channel.exchangeDeclare(EXCHANGE_NAME,?EXCHANGE_TYPE);//?只有routingKey相同的才會消費String?message?=?"routing?mode?message";channel.basicPublish(EXCHANGE_NAME,?"key2",?null,?message.getBytes());System.out.println("[x]?Sent?'"?+?message?+?"'"); //????????channel.basicPublish(EXCHANGE_NAME,?"key",?null,?message.getBytes()); //????????System.out.println("[x]?Sent?'"?+?message?+?"'");channel.close();connection.close();} }主題模式
import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver1?{private?final?static?String?QUEUE_NAME?=?"queue_topic";private?final?static?String?EXCHANGE_NAME?=?"exchange_topic";public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//?可以接收key.1channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"key.*");channel.basicQos(1);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver2?{private?final?static?String?QUEUE_NAME?=?"queue_topic2";private?final?static?String?EXCHANGE_NAME?=?"exchange_topic";private?final?static?String?EXCHANGE_TYPE?=?"topic";public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//?*號代表單個單詞,可以接收key.1channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"*.*");//?#號代表多個單詞,可以接收key.1.2channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"*.#");channel.basicQos(1);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Sender?{private?final?static?String?EXCHANGE_NAME?=?"exchange_topic";private?final?static?String?EXCHANGE_TYPE?=?"topic";public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME,?EXCHANGE_TYPE);String?message?=?"topics?model?message?with?key.1";channel.basicPublish(EXCHANGE_NAME,?"key.1",?null,?message.getBytes());System.out.println("[x]?Sent?'"?+?message?+?"'");String?message2?=?"topics?model?message?with?key.1.2";channel.basicPublish(EXCHANGE_NAME,?"key.1.2",?null,?message2.getBytes());System.out.println("[x]?Sent?'"?+?message2?+?"'");channel.close();connection.close();} }四種交換機介紹
直連交換機(Direct exchange): 具有路由功能的交換機,綁定到此交換機的時候需要指定一個routing_key,交換機發送消息的時候需要routing_key,會將消息發送道對應的隊列
扇形交換機(Fanout exchange): 廣播消息到所有隊列,沒有任何處理,速度最快
主題交換機(Topic exchange): 在直連交換機基礎上增加模式匹配,也就是對routing_key進行模式匹配,*代表一個單詞,#代表多個單詞
首部交換機(Headers exchange): 忽略routing_key,使用Headers信息(一個Hash的數據結構)進行匹配,優勢在于可以有更多更靈活的匹配規則
總結
這么多種隊列模式中都有其應用場景,大家可以根據應用場景示例中進行選擇
參考
RabbitMQ官方教程
官方教程源碼
總結
以上是生活随笔為你收集整理的RabbitMQ中7种消息队列和保姆级代码演示!的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring Cache 实战:兼容所有
- 下一篇: JavaScript | 将十进制转换为