mq 接口 java_Rabbitmq Java Client Api详解
AMQP
AMQP協議是一個高級抽象層消息通信協議,RabbitMQ是AMQP協議的實現。
基礎概念快速入門
每個rabbitmq-server叫做一個Broker,等著tcp連接進入。
在rabbitmq-server進程內有Exchange,定義了這個消息的發送類型。(一對多、直連、多對多等等)
Queue是進程內的邏輯隊列,有多個,有名字。
Binding聯系Exchane與Queue。
Routing key由生產者指定。Binding key由消費者指定。二者聯合決定一條消息的來去。
java client api
連接
ConnectionFactory?factory?=?new?ConnectionFactory();
factory.setHost(hostName);
Connection?conn?=?factory.newConnection();
Channel?channel?=?conn.createChannel();
以上是得到一個rabbitmq連接最最基礎的代碼,當然了,還可以設置一些諸如用戶名密碼的事情。
最后這個channel就可以用來收和發消息了。
消息者線程池
ExecutorService?es?=?Executors.newFixedThreadPool(20);
Connection?conn?=?factory.newConnection(es);
消費者時使用,上述自動開了一20個線程的池來搞。
地址數組
Address[]?addrArr?=?new?Address[]{?new?Address(hostname1,?portnumber1)
,?new?Address(hostname2,?portnumber2)};
Connection?conn?=?factory.newConnection(addrArr);
上述代碼如果連hostname1失敗了就去hostname2。
factory.newConnection()會觸發這個檢測。
聲明exchange與queue
channel.exchangeDeclare(exchangeName,?"direct",?true);
String?queueName?=?channel.queueDeclare().getQueue();
channel.queueBind(queueName,?exchangeName,?routingKey);
channel.exchangeDeclare 參數有 交換機名字 ?類型 ?是否持久化 ?不使用時是否自動刪除 是否是內部的(不能被客戶端使用) 其他參數
channel.queueDeclare 參數有 queue名字 是否持久化 獨占的queue(僅供此連接) 不使用時是否自動刪除 其他參數
channel.queueBind 參數有 queue名字 交換機名字 此次綁定使用的路由關鍵字 ?其他參數
發出消息
byte[]?messageBodyBytes?=?"Hello,?world!".getBytes();
channel.basicPublish(exchangeName,?routingKey,?null,?messageBodyBytes);
channel.basicPublish 參數有 要發出的交換機名字 ?路由關鍵字 是否強制(設置為true時,找不到收的人時可以通過returnListener返回) ?是否立即(其實rabbitmq不支持) 其他屬性 消息主體
線程安全
Channel是線程好全的,但是最好是每個線程里用自己的Channel,因為在單個Channel里排隊是有可能慢一些的。
最簡單的辦法消費消息
boolean?autoAck?=?false;
channel.basicConsume(queueName,?autoAck,?"myConsumerTag",
new?DefaultConsumer(channel)?{
@Override
public?void?handleDelivery(String?consumerTag,
Envelope?envelope,
AMQP.BasicProperties?properties,
byte[]?body)?throws?IOException?{
String?routingKey?=?envelope.getRoutingKey();
String?contentType?=?properties.contentType;
long?deliveryTag?=?envelope.getDeliveryTag();
//?(process?the?message?components?here?...)
channel.basicAck(deliveryTag,?false);
}
});
一個Channel一個Consumer。
channel.basicAck 回發ACK 參數 tag 是否多個。
零碎
channel.basicQos 指定服務質量設置 參數 最大的投送字節數 ?最大的投送消息數量 ?設置是否要應用到整個channel(而不是一個消費者)。
factory.setAutomaticRecoveryEnabled(true) 網絡有問題時,好后可自動恢復設置。
cf.setRequestedHeartbeat(5) 設置心跳時間。
exchange type可用的值:direct topic headers fanout。
exchange的類型有一個default,basicPublish沒有指定時使用,而且,如果routingKey在指定綁定的時候,會去到綁定的exchange。
channel.queueDeclare().getQueue() 得到的是一個隨機queue,斷開連接后即刪除。
當exchange為direct的時候routingKey與bindingKey必須完全一致才能消費消息。
總結
以上是生活随笔為你收集整理的mq 接口 java_Rabbitmq Java Client Api详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: db_mysql.so_vsftpd在m
- 下一篇: java 编辑我的世界_Editing