RabbitMQ(四):Exchange交换器--direct
內(nèi)容翻譯自:RabbitMQ Tutorials Java版
RabbitMQ(一):Hello World程序
RabbitMQ(二):Work Queues、循環(huán)分發(fā)、消息確認(rèn)、持久化、公平分發(fā)
RabbitMQ(三):Exchange交換器--fanout
RabbitMQ(四):Exchange交換器--direct
RabbitMQ(五):Exchange交換器--topic
RabbitMQ(六):回調(diào)隊(duì)列callback queue、關(guān)聯(lián)標(biāo)識(shí)correlation id、實(shí)現(xiàn)簡(jiǎn)單的RPC系統(tǒng)
RabbitMQ(七):常用方法說(shuō)明 與 學(xué)習(xí)小結(jié)
Routing:
在上一篇博客中,我們創(chuàng)建了一個(gè)簡(jiǎn)單的日志系統(tǒng)。我們可以將日志消息廣播給所有的接收者(消費(fèi)者)。
在這個(gè)教程中,我們將為我們的日志系統(tǒng)添加一個(gè)功能:僅僅訂閱一部分消息。比如,我們可以直接將關(guān)鍵的錯(cuò)誤類型日志消息保存到日志文件中,還可以同時(shí)將所有的日志消息打印到控制臺(tái)。
綁定(Bindings):
在之前的例子中,我們已經(jīng)創(chuàng)建了綁定:
channel.queueBind(queueName, EXCHANGE_NAME, "");一個(gè)綁定是建立在一個(gè)隊(duì)列和一個(gè)路由器之間的關(guān)系,可以解讀為:該隊(duì)列對(duì)這個(gè)路由器中的消息感興趣。
綁定可以設(shè)置另外的參數(shù):路由鍵routingKey。為了避免和void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)中的routingKey混淆,我們將這里的key稱為綁定鍵binding key,下面的代碼展示了如何使用綁定鍵來(lái)創(chuàng)建一個(gè)綁定關(guān)系:
channel.queueBind(queueName, EXCHANGE_NAME, "black");綁定鍵的含義取決于路由器的類型,我們之前使用的fanout類型路由器會(huì)忽略該值。
直接路由器 (Direct Exchange):
我們之前的日志系統(tǒng)會(huì)將所有消息廣播給所有消費(fèi)者。現(xiàn)在我們想根據(jù)日志的嚴(yán)重程度來(lái)過(guò)濾日志。比如,我們想要一個(gè)程序來(lái)將error日志寫到磁盤文件中,而不要將warning或info日志寫到磁盤中,以免浪費(fèi)磁盤空間。
我們之前使用的fanout路由器缺少靈活性,它只是沒(méi)頭腦地廣播消息。所以,我們用direct路由器來(lái)替換它。direct路由器背后的路由算法很簡(jiǎn)單:只有當(dāng)消息的路由鍵routing key與隊(duì)列的綁定鍵binding key完全匹配時(shí),該消息才會(huì)進(jìn)入該隊(duì)列。
為了演示上面拗口的表述中的意思,考慮下面的設(shè)置:
上圖中,直接路由器X與兩個(gè)隊(duì)列綁定。第一個(gè)隊(duì)列以綁定鍵orange來(lái)綁定,第二個(gè)隊(duì)列以兩個(gè)綁定鍵black和green和路由器綁定。
按照這種設(shè)置,路由鍵為orange的消息在發(fā)布給路由器后,將會(huì)被路由到隊(duì)列Q1,路由鍵為black或者green的消息將會(huì)路由到隊(duì)列Q2。
多重綁定(Multiple bindings):
多個(gè)隊(duì)列以相同的綁定鍵binding key綁定到同一個(gè)Exchange上,是完全可以的。按照這種方式設(shè)置的話,直接路由器就會(huì)像fanout路由器一樣,將消息廣播給所有符合路由規(guī)則的隊(duì)列。一個(gè)路由鍵為black的消息將會(huì)發(fā)布到隊(duì)列Q1和Q2。
發(fā)布消息:
在這個(gè)教程中,我們使用direct路由器來(lái)代替上個(gè)教程中的fanout路由器。同時(shí),我們?yōu)槿罩驹O(shè)置嚴(yán)重級(jí)別,并將此作為路由鍵。這樣,接收者(消費(fèi)者)就可以選擇性地接收日志消息。
首先,創(chuàng)建一個(gè)路由器:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");接著,發(fā)送一個(gè)消息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());簡(jiǎn)單起見,我們假設(shè)severity只能是info、warning 、error中的一種。
消息訂閱:
接收消息將會(huì)和之前的教程類似,只是我們會(huì)為每一個(gè)級(jí)別的消息來(lái)創(chuàng)建不同的綁定:
String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){channel.queueBind(queueName, EXCHANGE_NAME, severity); }放在一塊:
生產(chǎn)者EmitLogDirect.java的完整代碼:
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {//創(chuàng)建連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明路由器和路由器的類型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String severity = "info";String message = ".........i am msg.........";//發(fā)布消息channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + severity + "':'" + message + "'");channel.close();connection.close();}}消費(fèi)者ReceiveLogsDirect.java的完整代碼如下:
import com.rabbitmq.client.*; import java.io.IOException;public class ReceiveLogsDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {//建立連接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明路由器和類型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//聲明隊(duì)列String queueName = channel.queueDeclare().getQueue();//定義要監(jiān)聽的級(jí)別String[] severities = {"info", "warning", "error"};//根據(jù)綁定鍵綁定for (String severity : severities) {channel.queueBind(queueName, EXCHANGE_NAME, severity);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");}};channel.basicConsume(queueName, true, consumer);} }現(xiàn)在可以進(jìn)行測(cè)試了。首先,啟動(dòng)一個(gè)消費(fèi)者實(shí)例(ReceiveLogsDirect.java),然后將其中的要監(jiān)聽的級(jí)別改為String[] severities = {"error"};,再啟動(dòng)另一個(gè)消費(fèi)者實(shí)例。此時(shí),這兩個(gè)消費(fèi)者都開始監(jiān)聽了,一個(gè)監(jiān)聽所有級(jí)別的日志消息,另一個(gè)監(jiān)聽error日志消息。
然后,啟動(dòng)生產(chǎn)者(EmitLogDirect.java),之后將String severity = "info";中的info,分別改為warning、error后運(yùn)行。
這樣,就可以在控制臺(tái)看到如下輸出:
?
說(shuō)明:
①與原文略有出入,如有疑問(wèn),請(qǐng)參閱原文
②原文均是編譯后通過(guò)javacp命令直接運(yùn)行程序,我是在IDE中進(jìn)行的,相應(yīng)的操作做了修改。
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ(四):Exchange交换器--direct的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: RabbitMQ(三):Exchange
- 下一篇: RabbitMQ(五):Exchange