javascript
Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务
文章目錄
- 概述
- 添加依賴
- 配置文件配置RabbitMQ的地址信息
- 接口定義
- 接收方 @EnableBinding @StreamListener
- 測試
- 消費組
- 發送復雜對象
- 消息回執
- 代碼
概述
官網 : https://spring.io/projects/spring-cloud-stream
概括來說,Spring Cloud Stream 進一步封裝了消息隊列,可以做到代碼層面對消息隊列無感知。
這里我們僅僅是做個入門級別的介紹,更多用法還是參考官網上的指導說明,畢竟最權威了。
添加依賴
無需多說,要想使用Spring Cloud Stream ,第一步肯定是添加依賴了 ,如下
這里使用的消息隊列是 RabbitMQ ,如果你是用的是kafka,換成對應的spring-cloud-starter-stream-kafka依賴即可
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>配置文件配置RabbitMQ的地址信息
spring-cloud-starter-stream-rabbit是Spring Cloud Stream對RabbitMQ的封裝,包含了對RabbitMQ的自動化配置,比如連接的RabbitMQ的默認地址localhost,默認端口5672,默認用戶guest,默認密碼guest,如果采用的是如上默認配置,可以不用修改配置。
這里我把配置文件放到了遠端的Git,通過config server 拉取配置。
RabbitMQ的安裝 ,這里我選擇了使用Docker鏡像,安裝如下
在Docker CE中安裝RabbitMQ
接口定義
可知: Sink和Source兩個接口分別定義了輸入通道和輸出通道,Processor通過繼承Source和Sink,同時具有輸入通道和輸出通道。這里我們就模仿Sink和Source,自定義一個消息通道。
package com.artisan.order.message;import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel;public interface ArtisanSink {// 同一個服務里面的通道名字不能一樣,在不同的服務里可以相同名字的通道// 否則啟動拋出如下異常 bean definition with this name already existsString INPUT = "MyMsgInput";@Input(ArtisanSink.INPUT)SubscribableChannel input();}如上定義了一個名為MyMsgInput的消息輸入通道,@Input注解的參數則表示了消息通道的名稱
接收方 @EnableBinding @StreamListener
StreamReceive 用來接收RabbitMQ發送來的消息
package com.artisan.order.message;import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component;/*** 接收方*/@Component // Step1 注解 綁定剛才的接口 @EnableBinding(ArtisanSink.class) @Slf4j public class StreamReceive {// Step2 @StreamListener 綁定對象的名稱@StreamListener(ArtisanSink.INPUT)public void processStreamMsg(Object msg){log.info("StreamReceive: {}",msg);}}- 第一步: 使用了@EnableBinding注解實現對消息通道的綁定,我們在該注解中還傳入了一個參數ArtisanSink.class,ArtisanSink是一個自定義接口,主要功能是實現對輸入消息通道綁定的定義。
- 第二步:在StreamReceive 類中定義了processStreamMsg方法,重點是在該方法上添加了@StreamListener注解,該注解表示該方法為消息中間件上數據流的事件監聽器,ArtisanSink.INPUT參數表示這是input消息通道上的監聽處理器。
測試
模擬發送發發送消息,方便起見,我們直接在Controller層寫個方法吧
package com.artisan.order.controller;import com.artisan.order.message.Sink; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController;@RestController @Slf4j public class MsgStreamController {@Autowiredprivate ArtisanSink sink;@GetMapping("/sendMsgByStream")public void sendMsgByStream(){String message = "I am one msg sent by Spring Cloud Stream";sink.input().send(MessageBuilder.withPayload(message).build());} }通過 @Autowired自動注入剛才的Sink接口,然后調用 sink.input().send方法發送消息即可。
啟動服務,觀察RabbitMQ上的隊列 ,自動創建了一個
點進去看下
MyMsgInput和 在接口中的定義一致 。
訪問: http://localhost:8081/sendMsgByStream
觀察日志:
2019-04-13 10:56:32.749 INFO 820 --- [nio-8081-exec-4] com.artisan.order.message.StreamReceive : StreamReceive: I am one msg sent by Spring Cloud Stream接收方收到了一條消息如上,OK。
消費組
需求: 由于服務可能會有多個實例同時在運行,我們只希望消息被一個實例所接收
先來改造下項目,啟動多個服務實例
為了多啟動幾個節點,我們需要把定義在遠端Git上的要加載到bootstrap.yml中的端口信息給注釋掉,否則第二個端口因端口沖突起不來。
然后通過如下方式在JVM參數中指定啟動端口
第一個app 啟動端口 -Dserver.port=8082
第一個app 啟動端口 -Dserver.port=5656
啟動后查看在Eureka Server上的注冊情況
再看看RabbitMQ的消息隊列情況,兩個 OK
舊版本中 ,如果不做任何設置,此時發送一條消息將會被所有的實例接收到 ,但是可以通過消息分組來解決 。
具體可參考: https://segmentfault.com/a/1190000011796459
主要是配置分組
spring:cloud:stream:bindings:# MyMsgInput 自定義 order消費組MyMsgInput:# 消息組的名稱group: order#輸入通道的主題名destination: MyMsgInput#存在消息隊列中的消息,如果是復雜對象,則以JSON的形式展示content-type: application/json新版本:
Spring Boot : 2.0.3.RELEASE
Spring Cloud : Finchley.RELEASE
經過測試 不存在這個問題
把這倆節點的日志信息都清空掉,重新發送個消息
我們就用5656這個節點好了 ,http://localhost:5656/sendMsgByStream
經過驗證只有5656這一個節點收到了消息。無需設置分組。
發送復雜對象
上面的例子中我們發送的是一個字符串,
如果是復雜對象呢? 來測試下
@GetMapping("/sendMsgByStream2")public void sendMsgByStream2(){OrderDTO orderDTO = new OrderDTO();orderDTO.setOrderId("11111");orderDTO.setOrderAmount(new BigDecimal(9999));sink.input().send(MessageBuilder.withPayload(orderDTO).build());}啟動5656端口的服務,訪問 http://localhost:5656/sendMsgByStream2
觀察日志:
2019-04-13 17:06:47.438 INFO 13764 --- [nio-5656-exec-1] com.artisan.order.message.StreamReceive : StreamReceive: OrderDTO(orderId=11111, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=9999, orderStatus=null, payStatus=null, orderDetailList=null)OK。
這是我們如果把消息消費方注釋掉,讓消息累計在消息隊列中,我們去看下消息隊列中存儲的復雜對象的格式
啟動5656端口的服務,訪問 http://localhost:5656/sendMsgByStream2
org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers消息回執
消費者收到消息后給發送方一個ACK確認,該如何做呢?
比如接收到消息后,返回給ArtisanSource.OUTPUT一個消息,直接使用@SendTo直接即可,就會將返回的字符串發送給ArtisanSource.OUTPUT通道
定義一個
package com.artisan.order.message;import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel;public interface ArtisanSource {String OUTPUT = "MyMsgOutput";@Output(ArtisanSource.OUTPUT)MessageChannel output(); }寫一個該消息的接收方
package com.artisan.order.message;import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component;/*** 接收方*/@Component // Step1 注解 綁定剛才的接口 @EnableBinding(ArtisanSource.class) @Slf4j public class StreamReceive2 {// Step2 @StreamListener 綁定對象的名稱@StreamListener(ArtisanSource.OUTPUT)public void processStreamMsg2(String msg){log.info("OUTPUT StreamReceive: {}",msg);}}啟動微服務,訪問 http://localhost:5656/sendMsgByStream2
2019-04-13 18:06:51.817 INFO 972 --- [nio-5656-exec-1] com.artisan.order.message.StreamReceive : INPUT StreamReceive: OrderDTO(orderId=11111, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=9999, orderStatus=null, payStatus=null, orderDetailList=null) 2019-04-13 18:06:51.823 INFO 972 --- [nio-5656-exec-1] c.artisan.order.message.StreamReceive2 : OUTPUT StreamReceive: received OK代碼
https://github.com/yangshangwei/springcloud-o2o/tree/master/artisan_order
總結
以上是生活随笔為你收集整理的Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring Boot2.x-15 整合
- 下一篇: Spring Cloud【Finchle