arouter跨module传递消息_消息队列中间件(二)使用 ActiveMQ
ActiveMQ 介紹
Active MQ 是由 Apache 出品的一款流行的功能強大的開源消息中間件,它速度快,支持跨語言的客戶端,具有易于使用的企業集成模式和許多的高級功能,同時完全支持?JSM1.1?和 J2EE1.4 。
- 官方下載地址: - ?http://activemq.apache.org/download.html?? 
- 官方安裝教程: - ?http://activemq.apache.org/getting-started.html?? 
- 默認管理頁面: - http://127.0.0.1:8161/admin/?? 
- 默認用戶名和密碼為 - admin / admin。 - conf / jetty-real.properties 您可以在此文件中進行配置。?? 
- 默認服務端口:61616?? 
ActiveMQ 特點
- 支持Java,C,C ++,C#,Ruby,Perl,Python,PHP等各種跨語言客戶端和協議,如 OpenWire , Stomp , AMQP , MQTT. 
- 完全支持JMS 1.1和 J2EE 1.4,支持瞬態,持久,事務和XA消息傳遞。 
- 對 Spring 框架的支持以便ActiveMQ可以輕松嵌入到Spring應用程序中。 
- 通過了常見的 J2EE 服務器測試,如 TomEE,Geronimo,JBoss,GlassFish 和 WebLogic 。 
- 連接方式的多樣化,ActiveMQ 提供了多種連接模式,例如 in-VM、TCP、SSL、NIO、UDP、多播、JGroups、JXTA。 
- 可以通過使用 JDBC 和 journal 實現消息的快速持久化。 
- 專為高性能群集,客戶端 - 服務器,點對點通信而設計。 
- 提供與語言無關的 REST API。 
- 支持 Ajax 方式調用 ActiveMQ。 
- ActiveMQ 可以輕松地與 CXF、Axis 等 Web Service 技術整合,以提供可靠的消息傳遞。 
- 可用作為內存中的 JMS 提供者,非常適合 JMS 單元測試。 
ActiveMQ 消息
點對點隊列模式
消息到達消息系統,被保留在消息隊列中,然后由一個或者多個消費者消費隊列中的消息,一個消息只能被一個消費者消費,然后就會被移除。例如訂單處理系統。
發布-訂閱模式
消息發送時指定主題(或者說通道),消息被保留在指定的主題中,消費者可以訂閱多個主題,并使用主題中的所有的消息,例如現實中的電視與電視頻道。所有客戶端包括發布者和訂閱者,主題中的消息可以被所有的訂閱者消費,消費者只能消費訂閱之后發送到主題中的消息。
ActiveMQ 概念
- Broker,消息代理,表示消息隊列服務器實體,接受客戶端連接,提供消息通信的核心服務。 
- Producer,消息生產者,業務的發起方,負責生產消息并傳輸給 Broker 。 
- Consumer,消息消費者,業務的處理方,負責從 Broker 獲取消息并進行業務邏輯處理。 
- Topic,主題,發布訂閱模式下的消息統一匯集地,不同生產者向 Topic 發送消息,由 Broker 分發到不同的訂閱者,實現消息的廣播。 
- Queue,隊列,點對點模式下特定生產者向特定隊列發送消息,消費者訂閱特定隊列接收消息并進行業務邏輯處理。 
- Message,消息體,根據不同通信協議定義的固定格式進行編碼的數據包,來封裝業務 數據,實現消息的傳輸。 
ActiveMQ 工程實例
下面是使用 ActiveMQ 的隊列模式和發布-訂閱模式的 Java 代碼示例。
POM 依賴
? ? ? ? ? ? ? ?<dependency> ? ? ? ? ? ?<groupId>org.apache.activemqgroupId> ? ? ? ? ? ?<artifactId>activemq-allartifactId> ? ? ? ? ? ?<version>5.15.5version> ? ? ? ?dependency>隊列模式消費者
import?org.apache.activemq.ActiveMQConnectionFactory;import?javax.jms.*;
/**
?*?
?*?消息消費者,用于消費消息
?*
?*?@Author?niujinpeng
?*?@Date?2018/9/4?23:45
?*/
public?class?AppConsumer?{
????private?static?final?String?url?=?"tcp://127.0.0.1:61616";
????private?static?final?String?queueName?=?"queue-test";
????public?static?void?main(String[]?args)?throws?JMSException?{
????????//?1.創建ConnectionFactory
????????ActiveMQConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory();
????????//?2.創建Connection
????????Connection?connection?=?connectionFactory.createConnection();
????????//?3.啟動連接
????????connection.start();
????????//?4.創建會話,false,不使用事務,自動應答模式
????????Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
????????//?5.創建一個目標
????????Destination?destination?=?session.createQueue(queueName);
????????//?6.創建消費者
????????MessageConsumer?consumer?=?session.createConsumer(destination);
????????//?7.創建一個監聽器
????????consumer.setMessageListener(new?MessageListener()?{
????????????public?void?onMessage(Message?message)?{
????????????????TextMessage?textMessage?=?(TextMessage)?message;
????????????????try?{
????????????????????System.out.println("接收消息:"?+?textMessage.getText());
????????????????}?catch?(JMSException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????});
????????//?8.關閉連接
????????//connection.close();
????}
}
隊列模式生產者
import?org.apache.activemq.ActiveMQConnectionFactory;import?javax.jms.*;
/**
?*?
?*?消息提供者,用于向消息中間件發送消息
?*
?*?@Author?niujinpeng
?*?@Date?2018/9/4?23:28
?*/
public?class?AppProducer?{
????private?static?final?String?url?=?"tcp://127.0.0.1:61616";
????private?static?final?String?queueName?=?"queue-test";
????public?static?void?main(String[]?args)?throws?JMSException?{
????????//?1.創建ConnectionFactory
????????ActiveMQConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory();
????????//?2.創建Connection
????????Connection?connection?=?connectionFactory.createConnection();
????????//?3.啟動連接
????????connection.start();
????????//?4.創建會話,false,不使用事務,自動應答模式
????????Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
????????//?5.創建一個目標
????????Destination?destination?=?session.createQueue(queueName);
????????//?6.創建生產者
????????MessageProducer?producer?=?session.createProducer(destination);
????????//?7.創建消息并發送
????????for?(int?i?=?0;?i?10;?i++)?{
????????????//?創建消息
????????????TextMessage?textMessage?=?session.createTextMessage("textMessage"?+?i);
????????????//?發布消息
????????????producer.send(textMessage);
????????????System.out.println("發送消息:"?+?textMessage.getText());
????????}
????????//?8.關閉連接
????????connection.close();
????}
}
隊列模式生產者
import?org.apache.activemq.ActiveMQConnectionFactory;import?javax.jms.*;
/**
?*?
?*?消息提供者,用于向消息中間件發送消息
?*
?*?@Author?niujinpeng
?*?@Date?2018/9/4?23:28
?*/
public?class?AppProducer?{
????private?static?final?String?url?=?"tcp://127.0.0.1:61616";
????private?static?final?String?queueName?=?"queue-test";
????public?static?void?main(String[]?args)?throws?JMSException?{
????????//?1.創建ConnectionFactory
????????ActiveMQConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory();
????????//?2.創建Connection
????????Connection?connection?=?connectionFactory.createConnection();
????????//?3.啟動連接
????????connection.start();
????????//?4.創建會話,false,不使用事務,自動應答模式
????????Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
????????//?5.創建一個目標
????????Destination?destination?=?session.createQueue(queueName);
????????//?6.創建生產者
????????MessageProducer?producer?=?session.createProducer(destination);
????????//?7.創建消息并發送
????????for?(int?i?=?0;?i?10;?i++)?{
????????????//?創建消息
????????????TextMessage?textMessage?=?session.createTextMessage("textMessage"?+?i);
????????????//?發布消息
????????????producer.send(textMessage);
????????????System.out.println("發送消息:"?+?textMessage.getText());
????????}
????????//?8.關閉連接
????????connection.close();
????}
}
發布訂閱模式生產者
import?org.apache.activemq.ActiveMQConnectionFactory;import?javax.jms.*;
/**
?*?
?*?主題模式
?*?消息消費者,用于消費消息
?*
?*?@Author?niujinpeng
?*?@Date?2018/9/4?23:45
?*/
public?class?AppConsumer?{
????private?static?final?String?url?=?"tcp://127.0.0.1:61616";
????private?static?final?String?topicName?=?"topic-test";
????public?static?void?main(String[]?args)?throws?JMSException?{
????????//?1.創建ConnectionFactory
????????ActiveMQConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory();
????????//?2.創建Connection
????????Connection?connection?=?connectionFactory.createConnection();
????????//?3.啟動連接
????????connection.start();
????????//?4.創建會話,false,不使用事務,自動應答模式
????????Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
????????//?5.創建一個目標
????????Destination?destination?=?session.createTopic(topicName);
????????//?6.創建消費者
????????MessageConsumer?consumer?=?session.createConsumer(destination);
????????//?7.創建一個監聽器
????????consumer.setMessageListener(new?MessageListener()?{
????????????public?void?onMessage(Message?message)?{
????????????????TextMessage?textMessage?=?(TextMessage)?message;
????????????????try?{
????????????????????System.out.println("接收消息:"?+?textMessage.getText());
????????????????}?catch?(JMSException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????});
????????//?8.關閉連接
????????//connection.close();
????}
}
發布訂閱模式生產者
import?org.apache.activemq.ActiveMQConnectionFactory;import?javax.jms.*;
/**
?*?
?*?主題模式
?*?消息提供者,用于向消息中間件發送消息
?*
?*?@Author?niujinpeng
?*?@Date?2018/9/4?23:28
?*/
public?class?AppProducer?{
????private?static?final?String?url?=?"tcp://127.0.0.1:61616";
????private?static?final?String?topicName?=?"topic-test";
????public?static?void?main(String[]?args)?throws?JMSException?{
????????//?1.創建ConnectionFactory
????????ActiveMQConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory();
????????//?2.創建Connection
????????Connection?connection?=?connectionFactory.createConnection();
????????//?3.啟動連接
????????connection.start();
????????//?4.創建會話,false,不使用事務,自動應答模式
????????Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
????????//?5.創建一個目標
????????Destination?destination?=?session.createTopic(topicName);
????????//?6.創建生產者
????????MessageProducer?producer?=?session.createProducer(destination);
????????//?7.創建消息并發送
????????for?(int?i?=?0;?i?10;?i++)?{
????????????//?創建消息
????????????TextMessage?textMessage?=?session.createTextMessage("textMessage"?+?i);
????????????//?發布消息
????????????producer.send(textMessage);
????????????System.out.println("發送消息:"?+?textMessage.getText());
????????}
????????//?8.關閉連接
????????connection.close();
????}
}
GitHub源碼:
https://github.com/niumoo/message-queue
Spring 整合 ActiveMQ
在 Spring 中配置 Active MQ 就像Spring 整合其他功能一樣,我們需要在 XML 配置中配置幾個關鍵的實例即可。在 Active MQ 中有幾個對象的實例是至關重要的,如 Active MQ jms 連接工廠,為了減少連接斷開性能時間消耗的 jms 連接池以及生產者消費者等。
下面是一些詳細說明。
- ConnectionFactory 用于管理連接的連接工廠(Spring提供)。 
- 一個 Spring 為我們提供的連接池。 
- JmsTemplate 每次發送都會重新創建連接,會話和 Productor。 
- Spring 中提供了SingleConnectionFactory 和CachingConnectionFactory(增加了緩存功能)。 
- JmsTemplate 是用于發送和接收消息的模板類。 
- 是spring提供的,只需要向Spring 容器內注冊這個類就可以使用 JmsTemplate 方便的操作jms。 
- JmsTemplate 類是線程安全的,可以在整個應用范圍使用。 
- MessageListerner 消息監聽器 
- 使用一個onMessage方法,該方法只接收一個Message參數。 
POM 依賴
<properties>????????<spring.version>5.0.4.RELEASEspring.version>
????properties>
????<dependencies>
????????<dependency>
????????????<groupId>junitgroupId>
????????????<artifactId>junitartifactId>
????????????<version>4.11version>
????????????<scope>testscope>
????????dependency>
????????<dependency>
????????????<groupId>org.springframeworkgroupId>
????????????<artifactId>spring-contextartifactId>
????????????<version>${spring.version}version>
????????dependency>
????????<dependency>
????????????<groupId>org.springframeworkgroupId>
????????????<artifactId>spring-jmsartifactId>
????????????<version>5.1.1.RELEASEversion>
????????dependency>
????????<dependency>
????????????<groupId>org.springframeworkgroupId>
????????????<artifactId>spring-testartifactId>
????????????<version>${spring.version}version>
????????dependency>
????????
????????<dependency>
????????????<groupId>javax.jmsgroupId>
????????????<artifactId>javax.jms-apiartifactId>
????????????<version>2.0.1version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.activemqgroupId>
????????????<artifactId>activemq-coreartifactId>
????????????<version>5.7.0version>
????????????<exclusions>
????????????????<exclusion>
????????????????????<artifactId>spring-contextartifactId>
????????????????????<groupId>org.springframeworkgroupId>
????????????????exclusion>
????????????????<exclusion>
????????????????????<groupId>org.apache.geronimo.specsgroupId>
????????????????????<artifactId>geronimo-jms_1.1_specartifactId>
????????????????exclusion>
????????????exclusions>
????????dependency>
????dependencies>
XML 配置
XML 公共配置
為了份文件配置方便管理,下面是提取出來的公共配置,為了在獨立配置生產者和消費者 XML文件時引入,當然也可以直接把生產者和消費者以及所有的 XML bean 配置在一個文件里。
<?xml ?version="1.0"?encoding="UTF-8"?><beans?xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:content="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans.xsd?http://www.springframework.org/schema/context?http://www.springframework.org/schema/context/spring-context.xsd">
????<content:annotation-config/>
????
????<bean?id="targerConnectionFactory"?class="org.apache.activemq.ActiveMQConnectionFactory">
????????<property?name="brokerURL"?value="tcp://127.0.0.1:61616"/>
????bean>
????
????<bean?id="connectionFactory"?class="org.springframework.jms.connection.SingleConnectionFactory">
????????<property?name="targetConnectionFactory"?ref="targerConnectionFactory"/>
????bean>
????
????<bean?id="queueDestination"?class="org.apache.activemq.command.ActiveMQQueue">
????????<constructor-arg?value="queue-spring"/>
????bean>
????
????<bean?id="topicDestination"?class="org.apache.activemq.command.ActiveMQTopic">
????????<constructor-arg?value="topic-spring"/>
????bean>
beans>
XML 消費者
消費者主要是一個消息監聽器,監聽指定的隊列或者主題的消息信息,來有消息時調用回調監聽處理方法。這里我注釋掉了監聽的隊列模式,指定了主題模式。
<?xml ?version="1.0"?encoding="UTF-8"?><beans?xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:content="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans.xsd?http://www.springframework.org/schema/context?http://www.springframework.org/schema/context/spring-context.xsd">
????
????<import?resource="common.xml"/>
????
????<bean?id="consumerMessageListener"?class="net.codingme.jms.consumer.ConsumerMessageListener"/>
????
????<bean?id="jmsContainer"?class="org.springframework.jms.listener.DefaultMessageListenerContainer">
????????<property?name="connectionFactory"?ref="connectionFactory"/>
????????
????????
????????
????????<property?name="destination"?ref="topicDestination"/>
????????<property?name="messageListener"?ref="consumerMessageListener"/>
????bean>
beans>
XML 生產者
生成者的配置主要是使用 spring jms 模版對象,創建生產者實例用于生產消息。
<?xml ?version="1.0"?encoding="UTF-8"?><beans?xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:content="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans.xsd?http://www.springframework.org/schema/context?http://www.springframework.org/schema/context/spring-context.xsd">
????
????<import?resource="common.xml"/>
????
????<bean?id="jmsTemplate"?class="org.springframework.jms.core.JmsTemplate">
????????<property?name="connectionFactory"?ref="connectionFactory"/>
????bean>
????<bean?class="net.codingme.jms.producer.ProducerServiceImpl">bean>
beans>
生產者編寫
1. 定義接口
package?net.codingme.jms.producer;/**
?*?
?*
?*?@Author?niujinpeng
?*?@Date?2018/11/2518:19
?*/
public?interface?ProducerService?{
????public?void?sendMessage(String?message);
}
2. 主題模式生產者
package?net.codingme.jms.producer;import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.jms.core.JmsTemplate;
import?org.springframework.jms.core.MessageCreator;
import?javax.annotation.Resource;
import?javax.jms.*;
/**
?*?
?*
?*?@Author?niujinpeng
?*?@Date?2018/11/25?19:24
?*/
public?class?ProducerServiceImpl?implements?ProducerService?{
????@Autowired
????JmsTemplate?jmsTemplate;
????/**
?????*?主題模式
?????*/
????@Resource(name?=?"topicDestination")
????Destination?destination;
????@Override
????public?void?sendMessage(String?message)?{
????????//?使用jmsTemplate發送消息
????????jmsTemplate.send(destination,?new?MessageCreator()?{
????????????//?創建消息
????????????@Override
????????????public?Message?createMessage(Session?session)?throws?JMSException?{
????????????????TextMessage?textMessage?=?session.createTextMessage(message);
????????????????return?textMessage;
????????????}
????????});
????????System.out.println("發送消息:"?+?message);
????}
}
3. Spring 啟動 生產者
package?net.codingme.jms.producer;import?org.springframework.context.support.ClassPathXmlApplicationContext;
/**
?*?
?*?啟動器
?*
?*?@Author?niujinpeng
?*?@Date?2018/11/25?21:48
?*/
public?class?AppProducer?{
????public?static?void?main(String[]?args)?{
????????//?裝載配置文件
????????ClassPathXmlApplicationContext?context?=?new?ClassPathXmlApplicationContext("classpath:producer.xml");
????????ProducerService?service?=?context.getBean(ProducerService.class);
????????for?(int?i?=?0;?i?10;?i++)?{
????????????service.sendMessage("test"?+?i);
????????}
????????context.close();
????}
}
消費者編寫
Spring啟動和生產者類似。下面是消費者監聽器的實現。
package?net.codingme.jms.consumer;import?javax.jms.JMSException;
import?javax.jms.Message;
import?javax.jms.MessageListener;
import?javax.jms.TextMessage;
/**
?*?
?*?消息監聽器
?*
?*?@Author?niujinpeng
?*?@Date?2018/11/25?22:28
?*/
public?class?ConsumerMessageListener?implements?MessageListener?{
????@Override
????public?void?onMessage(Message?message)?{
????????TextMessage?textMessage?=?(TextMessage)?message;
????????try?{
????????????System.out.println("接收消息:"?+?textMessage.getText());
????????}?catch?(JMSException?e)?{
????????????e.printStackTrace();
????????}
????}
}
運行測試
首先主題模式下啟動兩個消費者,使用生產者推送10條消息。
在每個消費者下面都可以看到推送的完整消息。
文中代碼已經上傳到GitHub:
https://github.com/niumoo/message-queue
推薦閱讀
(點擊標題可跳轉閱讀)
夯實Java基礎系列16:一文讀懂Java IO流和常見面試題
夯實Java基礎系列15:Java注解簡介和最佳實踐
夯實Java基礎系列14:深入理解Java枚舉類
夯實Java基礎系列11:深入理解Java中的回調機制
夯實Java基礎系列10:深入理解Java中的異常體系
夯實Java基礎系列9:深入理解Class類和Object類
夯實Java基礎系列8:深入理解Java內部類及其實現原理
夯實Java基礎系列7:一文讀懂Java 代碼塊和代碼執行順序
一文搞懂抽象類和接口,從基礎到面試題,揭秘其本質區別!
一文讀懂 Java 文件和包結構,解讀開發中常用的 jar 包
一文了解 final 關鍵字的特性、使用方法以及實現原理
點個“在看”,轉發朋友圈,都是對我最好的支持!總結
以上是生活随笔為你收集整理的arouter跨module传递消息_消息队列中间件(二)使用 ActiveMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: window服务器cpu过高的排查_高频
- 下一篇: 梳妆镜对卧室门好不好
