springboot使用rabbitMQ(带回调)
生活随笔
收集整理的這篇文章主要介紹了
springboot使用rabbitMQ(带回调)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
springboot提供了各類東西的簡單集成,rabbitMQ也不例外,本文重點介紹如何集成rabbitMQ以及如何使用帶回調的rabbitMQ
萬年不變的第一步:pom
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>生產者
配置文件1:RabbitConfig
package com.mos.eboot.web.config;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope;import javax.annotation.Resource;/*** @author 小塵哥*/ @Configuration public class RabbitConfig {@Resourceprivate RabbitConstants rabbitConstants;@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses(rabbitConstants.getHost());connectionFactory.setUsername(rabbitConstants.getUsername());connectionFactory.setVirtualHost(rabbitConstants.getVirtualHost());connectionFactory.setPassword(rabbitConstants.getPassword()); // * 如果要進行消息回調,則這里必須要設置為trueconnectionFactory.setPublisherConfirms(rabbitConstants.getPublisherConfirms());return connectionFactory;}/*** 因為要設置回調類,所以應是prototype類型,如果是singleton類型,則回調類為最后一次設置*/@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate() {return new RabbitTemplate(connectionFactory());}}配置文件2:RabbitConstants(主要用于用戶名、密碼等值從配置文件獲取,也可以用@Value方式)
package com.mos.eboot.web.config;import org.springframework.boot.context.properties.ConfigurationProperties;/*** rabbit配置* @author 小塵哥*/ @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitConstants {public static final String EXCHANGE = "bootExchange";public static final String ROUTINGKEY = "routingkey";public static final String QUEUE = "bootQueue";private String host;private Integer port;private String username;private String password;private Boolean publisherConfirms;private String virtualHost;public String getHost() {return host;}public void setHost(String host) {this.host = host;}public Integer getPort() {return port;}public void setPort(Integer port) {this.port = port;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public Boolean getPublisherConfirms() {return publisherConfirms;}public void setPublisherConfirms(Boolean publisherConfirms) {this.publisherConfirms = publisherConfirms;}public String getVirtualHost() {return virtualHost;}public void setVirtualHost(String virtualHost) {this.virtualHost = virtualHost;} }配置文件3:DemoSender,即實際的消息發送者
package com.mos.eboot.web.sender;import com.mos.eboot.tools.util.IDGen; import com.mos.eboot.web.config.RabbitConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;/*** @author 小塵哥*/ @Component public class DemoSender implements RabbitTemplate.ConfirmCallback{private static final Logger LOGGER = LoggerFactory.getLogger(DemoSender.class);private RabbitTemplate rabbitTemplate;@Autowiredpublic DemoSender(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;this.rabbitTemplate.setConfirmCallback(this);}public void send(String msg) {CorrelationData correlationData = new CorrelationData(IDGen.genId());LOGGER.info("send: " + correlationData.getId());this.rabbitTemplate.convertAndSend(RabbitConstants.EXCHANGE, RabbitConstants.ROUTINGKEY, msg, correlationData);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm: " + correlationData.getId());} }測試:DemoController
package com.mos.eboot.web.controller;import com.mos.eboot.tools.controller.BaseController; import com.mos.eboot.tools.result.ResultModel; import com.mos.eboot.web.config.RabbitConstants; import com.mos.eboot.web.sender.DemoSender; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource; import javax.servlet.http.HttpSession;/*** @author 小塵哥*/ @RestController @RequestMapping("demo") public class DemoController extends BaseController {private static final Logger LOGGER = LoggerFactory.getLogger(DemoController.class);@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate DemoSender demoSender;@RequestMapping("amqp")public ResultModel amqp(){rabbitTemplate.convertAndSend(RabbitConstants.QUEUE,"1message from web");rabbitTemplate.convertAndSend("exchange","topic.messages","2message from web for exchage");rabbitTemplate.convertAndSend(RabbitConstants.EXCHANGE,RabbitConstants.ROUTINGKEY,"3message from web for fanoutExchange");//主要是下面這個demoSender.send("message from web for fanoutExchange1234234");return ResultModel.defaultSuccess(null);} }消費者
配置都相同,添加一個Listener,用來接收消息
package com.mos.eboot.consumer.config;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.handler.annotation.Payload;/*** @author 小塵哥*/ @Configuration @RabbitListener(queues = RabbitConstants.QUEUE) public class Listener {/** 設置交換機類型 */@Beanpublic DirectExchange defaultExchange() {/*** DirectExchange:按照routingkey分發到指定隊列* TopicExchange:多關鍵字匹配* FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念* HeadersExchange :通過添加屬性key-value匹配*/return new DirectExchange(RabbitConstants.EXCHANGE);}@Beanpublic Queue fooQueue() {return new Queue(RabbitConstants.QUEUE);}@Beanpublic Binding binding() {/** 將隊列綁定到交換機 */return BindingBuilder.bind(fooQueue()).to(defaultExchange()).with(RabbitConstants.ROUTINGKEY);}@RabbitHandlerpublic void process(@Payload String foo) {System.out.println("Listener: " + foo);} }yml配置
spring:redis:database: 0# Redis服務器地址host: 127.0.0.1# Redis服務器連接端口port: 6379# Redis服務器連接密碼(默認為空)password: 123456789rabbitmq:host: 172.16.14.93port: 5672username: dreamerpassword: dreamervirtualHost: ebootpublisherConfirms: true測試結果
訪問http://localhost:8881/demo/amqp(根據你的實際情況)
可以看到消費者接收到了所發送的三個消息,但是其中只有第三個demoSender.send()發送的有回調,而在DemoSender中重寫的confirm里也接收到了回調信息。
完整代碼已上傳碼云,戳【eboot】獲取源碼
總結
以上是生活随笔為你收集整理的springboot使用rabbitMQ(带回调)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数据绑定(十)Binding的数据转换
- 下一篇: 阿里云Kubernetes服务 - Se