javascript
Spring Boot 整合 Netty(附源码)
前言
本篇文章主要介紹的是SpringBoot整合Netty以及使用Protobuf進行數據傳輸的相關內容。Protobuf會簡單的介紹下用法,至于Netty在之前的文章中已經簡單的介紹過了,這里就不再過多細說了。
Protobuf
介紹
protocolbuffer(以下簡稱PB)是google 的一種數據交換的格式,它獨立于語言,獨立于平臺。google 提供了多種語言的實現:java、c#、c++、go 和python,每一種實現都包含了相應語言的編譯器以及庫文件。
由于它是一種二進制的格式,比使用 xml進行數據交換快許多。可以把它用于分布式應用之間的數據通信或者異構環境下的數據交換。作為一種效率和兼容性都很優秀的二進制數據傳輸格式,可以用于諸如網絡傳輸、配置文件、數據存儲等諸多領域。
官方地址:
https://github.com/google/protobuf
使用
這里的使用就只介紹Java相關的使用。首先我們需要建立一個proto文件,在該文件定義我們需要傳輸的文件。
例如我們需要定義一個用戶的信息,包含的字段主要有編號、名稱、年齡。
那么該protobuf文件的格式如下:
注:這里使用的是proto3,相關的注釋我已寫了,這里便不再過多講述了。需要注意一點的是proto文件和生成的Java文件名稱不能一致!
syntax?=?"proto3"; //?生成的包名 option?java_package="com.pancm.protobuf"; //生成的java名 option?java_outer_classname?=?"UserInfo";message?UserMsg?{//?IDint32?id?=?1;//?姓名string?name?=?2;//?年齡int32?age?=?3;//?狀態int32?state?=?4; }創建好該文件之后,我們把該文件和protoc.exe(生成Java文件的軟件)放到E盤目錄下的protobuf文件夾下,然后再到該目錄的dos界面下輸入:protoc.exe --java_out=文件絕對路徑名稱。
例如:
protoc.exe?--java_out=E:\protobuf?User.proto輸入完之后,回車即可在同級目錄看到已經生成好的Java文件,然后將該文件放到項目中該文件指定的路徑下即可。
注:生成protobuf的文件軟件和測試的protobuf文件我也整合到該項目中了,可以直接獲取的。
Java文件生成好之后,我們再來看怎么使用。
這里我就直接貼代碼了,并且將注釋寫在代碼中,應該更容易理解些。
代碼示例:
//?按照定義的數據結構,創建一個對象UserInfo.UserMsg.Builder?userInfo?=?UserInfo.UserMsg.newBuilder();userInfo.setId(1);userInfo.setName("xuwujing");userInfo.setAge(18);UserInfo.UserMsg?userMsg?=?userInfo.build();//?將數據寫到輸出流ByteArrayOutputStream?output?=?new?ByteArrayOutputStream();userMsg.writeTo(output);//?將數據序列化后發送byte[]?byteArray?=?output.toByteArray();//?接收到流并讀取ByteArrayInputStream?input?=?new?ByteArrayInputStream(byteArray);//?反序列化UserInfo.UserMsg?userInfo2?=?UserInfo.UserMsg.parseFrom(input);System.out.println("id:"?+?userInfo2.getId());System.out.println("name:"?+?userInfo2.getName());System.out.println("age:"?+?userInfo2.getAge());注:這里說明一點,因為protobuf是通過二進制進行傳輸,所以需要注意下相應的編碼。還有使用protobuf也需要注意一下一次傳輸的最大字節長度。
輸出結果:
id:1 name:xuwujing age:18 SpringBoot整合Netty說明:如果想直接獲取工程那么可以直接跳到底部,通過鏈接下載工程代碼。
-
開發準備
-
環境要求
-
JDK:1.8
-
Netty: 4.0或以上(不包括5)
-
Protobuf:3.0或以上
如果對Netty不熟的話,可以看看這些文章。大神請無視~。~
https://blog.csdn.net/column/details/17640.html
首先還是Maven的相關依賴:
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>1.8</java.version><netty.version>4.1.22.Final</netty.version><protobuf.version>3.5.1</protobuf.version><springboot>1.5.9.RELEASE</springboot><fastjson>1.2.41</fastjson><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><version>${springboot}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>${springboot}</version><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><version>${springboot}</version><optional>true</optional></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>${netty.version}</version></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>${protobuf.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency> </dependencies>添加了相應的maven依賴之后,配置文件這塊暫時沒有什么可以添加的,因為暫時就一個監聽的端口而已。
代碼編寫
代碼模塊主要分為服務端和客戶端。
主要實現的業務邏輯:
服務端啟動成功之后,客戶端也啟動成功,這時服務端會發送一條protobuf格式的信息給客戶端,然后客戶端給予相應的應答。客戶端與服務端連接成功之后,客戶端每個一段時間會發送心跳指令給服務端,告訴服務端該客戶端還存過中,如果客戶端沒有在指定的時間發送信息,服務端會關閉與該客戶端的連接。當客戶端無法連接到服務端之后,會每隔一段時間去嘗試重連,只到重連成功!
服務端
首先是編寫服務端的啟動類,相應的注釋在代碼中寫得很詳細了,這里也不再過多講述了。不過需要注意的是,在之前的我寫的Netty文章中,是通過main方法直接啟動服務端,因此是直接new一個對象的。而在和SpringBoot整合之后,我們需要將Netty交給springBoot去管理,所以這里就用了相應的注解。
代碼如下:
@Service("nettyServer") public?class?NettyServer?{private?static?final?int?port?=?9876;?//?設置服務端端口private?static?EventLoopGroup?boss?=?new?NioEventLoopGroup();?//?通過nio方式來接收連接和處理連接private?static?EventLoopGroup?work?=?new?NioEventLoopGroup();?//?通過nio方式來接收連接和處理連接private?static?ServerBootstrap?b?=?new?ServerBootstrap();@Autowiredprivate?NettyServerFilter?nettyServerFilter;public?void?run()?{try?{b.group(boss,?work);b.channel(NioServerSocketChannel.class);b.childHandler(nettyServerFilter);?//?設置過濾器//?服務器綁定端口監聽ChannelFuture?f?=?b.bind(port).sync();System.out.println("服務端啟動成功,端口是:"?+?port);//?監聽服務器關閉監聽f.channel().closeFuture().sync();}?catch?(InterruptedException?e)?{e.printStackTrace();}?finally?{//?關閉EventLoopGroup,釋放掉所有資源包括創建的線程work.shutdownGracefully();boss.shutdownGracefully();}} }服務端主類編寫完畢之后,我們再來設置下相應的過濾條件。
這里需要繼承Netty中ChannelInitializer類,然后重寫initChannel該方法,進行添加相應的設置,如心跳超時設置,傳輸協議設置,以及相應的業務實現類。
代碼如下:
????@Componentpublic?class?NettyServerFilter?extends?ChannelInitializer<SocketChannel>?{@Autowiredprivate?NettyServerHandler?nettyServerHandler;@Overrideprotected?void?initChannel(SocketChannel?ch)?throws?Exception?{ChannelPipeline?ph?=?ch.pipeline();//入參說明:?讀超時時間、寫超時時間、所有類型的超時時間、時間格式ph.addLast(new?IdleStateHandler(5,?0,?0,?TimeUnit.SECONDS));//?解碼和編碼,應和客戶端一致//傳輸的協議?Protobufph.addLast(new?ProtobufVarint32FrameDecoder());ph.addLast(new?ProtobufDecoder(UserMsg.getDefaultInstance()));ph.addLast(new?ProtobufVarint32LengthFieldPrepender());ph.addLast(new?ProtobufEncoder());//業務邏輯實現類ph.addLast("nettyServerHandler",?nettyServerHandler);}}服務相關的設置的代碼寫完之后,我們再來編寫主要的業務代碼。
使用Netty編寫業務層的代碼,我們需要繼承ChannelInboundHandlerAdapter 或SimpleChannelInboundHandler類,在這里順便說下它們兩的區別吧。
繼承SimpleChannelInboundHandler類之后,會在接收到數據后會自動release掉數據占用的Bytebuffer資源。并且繼承該類需要指定數據格式。
而繼承ChannelInboundHandlerAdapter則不會自動釋放,需要手動調用ReferenceCountUtil.release()等方法進行釋放。繼承該類不需要指定數據格式。所以在這里,個人推薦服務端繼承ChannelInboundHandlerAdapter,手動進行釋放,防止數據未處理完就自動釋放了。而且服務端可能有多個客戶端進行連接,并且每一個客戶端請求的數據格式都不一致,這時便可以進行相應的處理。
客戶端根據情況可以繼承SimpleChannelInboundHandler類。好處是直接指定好傳輸的數據格式,就不需要再進行格式的轉換了。
代碼如下:
@Service("nettyServerHandler") public?class?NettyServerHandler?extends?ChannelInboundHandlerAdapter?{/**?空閑次數?*/private?int?idle_count?=?1;/**?發送次數?*/private?int?count?=?1;/***?建立連接時,發送一條消息*/@Overridepublic?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{System.out.println("連接的客戶端地址:"?+?ctx.channel().remoteAddress());UserInfo.UserMsg?userMsg?=?UserInfo.UserMsg.newBuilder().setId(1).setAge(18).setName("xuwujing").setState(0).build();ctx.writeAndFlush(userMsg);super.channelActive(ctx);}/***?超時處理?如果5秒沒有接受客戶端的心跳,就觸發;?如果超過兩次,則直接關閉;*/@Overridepublic?void?userEventTriggered(ChannelHandlerContext?ctx,?Object?obj)?throws?Exception?{if?(obj?instanceof?IdleStateEvent)?{IdleStateEvent?event?=?(IdleStateEvent)?obj;if?(IdleState.READER_IDLE.equals(event.state()))?{?//?如果讀通道處于空閑狀態,說明沒有接收到心跳命令System.out.println("已經5秒沒有接收到客戶端的信息了");if?(idle_count?>?1)?{System.out.println("關閉這個不活躍的channel");ctx.channel().close();}idle_count++;}}?else?{super.userEventTriggered(ctx,?obj);}}/***?業務邏輯處理*/@Overridepublic?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{System.out.println("第"?+?count?+?"次"?+?",服務端接受的消息:"?+?msg);try?{//?如果是protobuf類型的數據if?(msg?instanceof?UserMsg)?{UserInfo.UserMsg?userState?=?(UserInfo.UserMsg)?msg;if?(userState.getState()?==?1)?{System.out.println("客戶端業務處理成功!");}?else?if(userState.getState()?==?2){System.out.println("接受到客戶端發送的心跳!");}else{System.out.println("未知命令!");}}?else?{System.out.println("未知數據!"?+?msg);return;}}?catch?(Exception?e)?{e.printStackTrace();}?finally?{ReferenceCountUtil.release(msg);}count++;}/***?異常處理*/@Overridepublic?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{cause.printStackTrace();ctx.close();} }還有個服務端的啟動類,之前是通過main方法直接啟動, 不過這里改成了通過springBoot進行啟動,差別不大。
代碼如下:
@SpringBootApplication public?class?NettyServerApp?{public?static?void?main(String[]?args)?{//?啟動嵌入式的?Tomcat?并初始化?Spring?環境及其各?Spring?組件ApplicationContext?context?=?SpringApplication.run(NettyServerApp.class,?args);NettyServer?nettyServer?=?context.getBean(NettyServer.class);nettyServer.run();}}到這里服務端相應的代碼就編寫完畢了。
客戶端
客戶端這邊的代碼和服務端的很多地方都類似,我就不再過多細說了,主要將一些不同的代碼拿出來簡單的講述下。
首先是客戶端的主類,基本和服務端的差不多,也就是多了監聽的端口和一個監聽器(用來監聽是否和服務端斷開連接,用于重連)。
主要實現的代碼邏輯如下:
????public?void?doConnect(Bootstrap?bootstrap,?EventLoopGroup?eventLoopGroup)?{ChannelFuture?f?=?null;try?{if?(bootstrap?!=?null)?{bootstrap.group(eventLoopGroup);bootstrap.channel(NioSocketChannel.class);bootstrap.option(ChannelOption.SO_KEEPALIVE,?true);bootstrap.handler(nettyClientFilter);bootstrap.remoteAddress(host,?port);f?=?bootstrap.connect().addListener((ChannelFuture?futureListener)?->?{final?EventLoop?eventLoop?=?futureListener.channel().eventLoop();if?(!futureListener.isSuccess())?{System.out.println("與服務端斷開連接!在10s之后準備嘗試重連!");eventLoop.schedule(()?->?doConnect(new?Bootstrap(),?eventLoop),?10,?TimeUnit.SECONDS);}});if(initFalg){System.out.println("Netty客戶端啟動成功!");initFalg=false;}//?阻塞f.channel().closeFuture().sync();}}?catch?(Exception?e)?{System.out.println("客戶端連接失敗!"+e.getMessage());}}注:監聽器這塊的實現用的是JDK1.8的寫法。
客戶端過濾其這塊基本和服務端一直。不過需要注意的是,傳輸協議、編碼和解碼應該一致,還有心跳的讀寫時間應該小于服務端所設置的時間。
改動的代碼如下:
???ChannelPipeline?ph?=?ch.pipeline();/**?解碼和編碼,應和服務端一致*?*///入參說明:?讀超時時間、寫超時時間、所有類型的超時時間、時間格式ph.addLast(new?IdleStateHandler(0,?4,?0,?TimeUnit.SECONDS));客戶端的業務代碼邏輯。
主要實現的幾點邏輯是心跳按時發送以及解析服務發送的protobuf格式的數據。
這里比服務端多個個注解, 該注解Sharable主要是為了多個handler可以被多個channel安全地共享,也就是保證線程安全。
廢話就不多說了,代碼如下:
????@Service("nettyClientHandler")@ChannelHandler.Sharablepublic?class?NettyClientHandler?extends?ChannelInboundHandlerAdapter?{@Autowiredprivate?NettyClient?nettyClient;/**?循環次數?*/private?int?fcount?=?1;/***?建立連接時*/@Overridepublic?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{System.out.println("建立連接時:"?+?new?Date());ctx.fireChannelActive();}/***?關閉連接時*/@Overridepublic?void?channelInactive(ChannelHandlerContext?ctx)?throws?Exception?{System.out.println("關閉連接時:"?+?new?Date());final?EventLoop?eventLoop?=?ctx.channel().eventLoop();nettyClient.doConnect(new?Bootstrap(),?eventLoop);super.channelInactive(ctx);}/***?心跳請求處理?每4秒發送一次心跳請求;**/@Overridepublic?void?userEventTriggered(ChannelHandlerContext?ctx,?Object?obj)?throws?Exception?{System.out.println("循環請求的時間:"?+?new?Date()?+?",次數"?+?fcount);if?(obj?instanceof?IdleStateEvent)?{IdleStateEvent?event?=?(IdleStateEvent)?obj;if?(IdleState.WRITER_IDLE.equals(event.state()))?{?//?如果寫通道處于空閑狀態,就發送心跳命令UserMsg.Builder?userState?=?UserMsg.newBuilder().setState(2);ctx.channel().writeAndFlush(userState);fcount++;}}}/***?業務邏輯處理*/@Overridepublic?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{//?如果不是protobuf類型的數據if?(!(msg?instanceof?UserMsg))?{System.out.println("未知數據!"?+?msg);return;}try?{//?得到protobuf的數據UserInfo.UserMsg?userMsg?=?(UserInfo.UserMsg)?msg;//?進行相應的業務處理。。。//?這里就從簡了,只是打印而已System.out.println("客戶端接受到的用戶信息。編號:"?+?userMsg.getId()?+?",姓名:"?+?userMsg.getName()?+?",年齡:"?+?userMsg.getAge());//?這里返回一個已經接受到數據的狀態UserMsg.Builder?userState?=?UserMsg.newBuilder().setState(1);ctx.writeAndFlush(userState);System.out.println("成功發送給服務端!");}?catch?(Exception?e)?{e.printStackTrace();}?finally?{ReferenceCountUtil.release(msg);}}}那么到這里客戶端的代碼也編寫完畢了。
功能測試
首先啟動服務端,然后再啟動客戶端。
我們來看看結果是否如上述所說。
服務端輸出結果:
服務端啟動成功,端口是:9876 連接的客戶端地址:/127.0.0.1:53319 第1次,服務端接受的消息:state:?1客戶端業務處理成功! 第2次,服務端接受的消息:state:?2接受到客戶端發送的心跳! 第3次,服務端接受的消息:state:?2接受到客戶端發送的心跳! 第4次,服務端接受的消息:state:?2接受到客戶端發送的心跳!客戶端輸入結果:
Netty客戶端啟動成功! 建立連接時:Mon Jul 16?23:31:58?CST?2018 客戶端接受到的用戶信息。編號:1,姓名:xuwujing,年齡:18 成功發送給服務端! 循環請求的時間:Mon Jul 16?23:32:02?CST?2018,次數1 循環請求的時間:Mon Jul 16?23:32:06?CST?2018,次數2 循環請求的時間:Mon Jul 16?23:32:10?CST?2018,次數3 循環請求的時間:Mon Jul 16?23:32:14?CST?2018,次數4通過打印信息可以看出如上述所說。
接下來我們再來看看客戶端是否能夠實現重連。
先啟動客戶端,再啟動服務端。
客戶端輸入結果:
Netty客戶端啟動成功! 與服務端斷開連接!在10s之后準備嘗試重連! 客戶端連接失敗!AbstractChannel$CloseFuture@1fbaa3ac(incomplete) 建立連接時:Mon Jul 16?23:41:33?CST?2018 客戶端接受到的用戶信息。編號:1,姓名:xuwujing,年齡:18 成功發送給服務端! 循環請求的時間:Mon Jul 16?23:41:38?CST?2018,次數1 循環請求的時間:Mon Jul 16?23:41:42?CST?2018,次數2 循環請求的時間:Mon Jul 16?23:41:46?CST?2018,次數3服務端輸出結果:
服務端啟動成功,端口是:9876 連接的客戶端地址:/127.0.0.1:53492 第1次,服務端接受的消息:state:?1客戶端業務處理成功! 第2次,服務端接受的消息:state:?2接受到客戶端發送的心跳! 第3次,服務端接受的消息:state:?2接受到客戶端發送的心跳! 第4次,服務端接受的消息:state:?2結果也如上述所說!
其它
關于SpringBoot整合Netty使用Protobuf進行數據傳輸到這里就結束了。
SpringBoot整合Netty使用Protobuf進行數據傳輸的項目工程地址:
https://github.com/xuwujing/springBoot-study/tree/master/springboot-netty-protobuf
對了,也有不使用springBoot整合的Netty項目工程地址:
https://github.com/xuwujing/Netty-study/tree/master/Netty-protobuf
總結
以上是生活随笔為你收集整理的Spring Boot 整合 Netty(附源码)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一本彻底搞懂MySQL索引优化EXPLA
- 下一篇: Spring Boot MongoDB