Netty组件
2019獨角獸企業重金招聘Python工程師標準>>>
1、NioEventLoopGroup
一個Netty服務端啟動時,通常會有兩個NioEventLoopGroup:一個boss,一個worker。第一個NioEventLoopGroup正常只需要一個EventLoop,主要負責客戶端的連接請求,然后打開一個Channel,交給第二個NioEventLoopGroup中的一個EventLoop來處理這個Channel上的所有讀寫事件。一個Channel只會被一個EventLoop處理,而一個EventLoop可能會被分配給多個Channel。
2、ChannelFuture 接口
由于Netty中的所有I/O操作都是異步的,因此Netty為了解決調用者如何獲取異步操作結果的問題而專門設計了ChannelFuture接口.?
可以調用ChannelFuture里的方法獲取channel,Channel與ChannelFuture可以說形影不離的.
3、Channel(通道)
可以理解為該類對socket進行了封裝,提供了數據的讀取和寫入功能。
4、ChannelHandler
Netty 定義了下面兩個重要的ChannelHandler 子接口:
ChannelInboundHandler(入站handler) —— channel上有消息可讀時,netty就會將該消息交給入站處理器處理。可以有過個處理器,形成一個鏈條。
內部方法:
當某個 ChannelInboundHandler 的實現重寫 channelRead()方法時,可以顯式地釋放與池化的 ByteBuf 實例相關的內存。Netty 為此提供了一個實用方法 ReferenceCountUtil.release();比如
但是以這種方式管理資源可能很繁瑣,一個更加簡單的方式是使用SimpleChannelInboundHandler 會自動釋放資源。如下:
?
ChannelOutboundHandler(出站handler) —— channel寫出數據時,會經過出站處理器的處理后,再發送到網絡上。可以有多個處理器,形成一個鏈條。
內部方法:
ChannelPromise與ChannelFuture:ChannelOutboundHandler中的大部分方法都需要一個ChannelPromise參數,以便在操作完成時得到通知。ChannelPromise是ChannelFuture的一個子類,其定義了一些可寫的方法,如setSuccess()和setFailure(),從而使ChannelFuture不可變。
注意:
如果一個消息被消費或者丟棄了,并且沒有傳遞給 ChannelPipeline 中的下一個ChannelOutboundHandler,那么用戶就有責任調用 ReferenceCountUtil.release()。
比如:
如果消息到達了實際的傳輸層,那么當它被寫入時或者 Channel 關閉時,都將被自動釋放。
5、ChannelPipeline
Netty會把出站Handler和入站Handler放到一個Pipeline中,同屬一個方向的Handler則是有順序的,因為上一個Handler處理的結果往往是下一個Handler的要求的輸入。
6、ChannelHandlerContext
ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之間的關聯,每當有 ChannelHandler 添加到 ChannelPipeline 中時,都會創建 ChannelHandlerContext。ChannelHandlerContext 的主要功能是管理它所關聯的 ChannelHandler 和在同一個 ChannelPipeline 中的其他 ChannelHandler之間的交互。
內部方法:
消息向下傳遞ctx.fireChannelRead(msg); 這個方法可能你可能會用到,其次就是write相關方法比較常用。
?
7、 實戰多個入站和出站處理器
7.1 服務端
Message對象:
@Data public class Message implements Serializable{//消息內容private String content;//消息時間戳long time; }入站處理器1:將Byte轉為Messag對象
/*** 將ByteBuf數據 轉為對象*/ public class EchoServerInHandler1 extends ChannelInboundHandlerAdapter {/*** 服務端讀取到網絡數據后的處理* @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {//ByteBuf為netty實現的緩沖區ByteBuf in = (ByteBuf)msg;String msgStr = in.toString(CharsetUtil.UTF_8);System.out.println("EchoServerInHandler1 處理:" + msgStr);Message message = JSON.parseObject(msgStr,new TypeReference<Message>(){});ReferenceCountUtil.release(msg);//通過使用 ReferenceCountUtil.realse(...)方法釋放資源ctx.fireChannelRead(message);}}?
入站處理器2:業務處理
public class EchoServerInHandler2 extends SimpleChannelInboundHandler<Message> {/*** 服務端讀取到網絡數據后的處理* @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead0(ChannelHandlerContext ctx, Message msg)throws Exception {System.out.println("EchoServerInHandler2 收到消息:" + msg);Message message = new Message();message.setContent("success");ctx.writeAndFlush(message);} }?
出站處理器1:將Message對象轉為字節發送到網絡
public class EchoServerOutHandler1 extends ChannelOutboundHandlerAdapter{@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("EchoServerOutHandler1 處理:"+msg);Message message = (Message)msg;String messageJson = JSON.toJSONString(message);ByteBuf byteBuf = Unpooled.copiedBuffer(messageJson, CharsetUtil.UTF_8);ctx.write(byteBuf, promise);} }出站處理器2:給Message對象加上時間戳
public class EchoServerOutHandler2 extends ChannelOutboundHandlerAdapter{@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("EchoServerOutHandler2 處理:"+msg);Message message = (Message) msg;message.setTime(System.currentTimeMillis());ctx.write(msg, promise);} }?
在pipeline上添加入站和出站處理器
public class EchoServer {private int port;private EventLoopGroup bossGroup;private EventLoopGroup workGroup;private ServerBootstrap b;public EchoServer(int port) {this.port = port;//第一個線程組是用于接收Client連接的bossGroup = new NioEventLoopGroup(1);//第二個線程組是用于消息的讀寫操作workGroup = new NioEventLoopGroup(2);//服務端輔助啟動類b = new ServerBootstrap();b.group(bossGroup, workGroup)//需要指定使用NioServerSocketChannel這種類型的通道.channel(NioServerSocketChannel.class)//向ChannelPipeline里添加業務處理handler.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//出站處理器 先注冊后執行ch.pipeline().addLast(new EchoServerOutHandler1());//out 1ch.pipeline().addLast(new EchoServerOutHandler2());//out 2//入站處理器 先注冊先執行ch.pipeline().addLast(new EchoServerInHandler1());//in1ch.pipeline().addLast(new EchoServerInHandler2());//in2}});}/*** 啟動* @throws InterruptedException*/public void start() throws InterruptedException {try {//綁定到端口,阻塞等待直到完成b.bind(this.port).sync();System.out.println("服務器啟動成功");} finally {}}/*** 資源優雅釋放*/public void close() {try {if (bossGroup != null)bossGroup.shutdownGracefully().sync();if (workGroup != null)workGroup.shutdownGracefully().sync();} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws InterruptedException, IOException {int port = 9999;EchoServer echoServer = new EchoServer(port);try {echoServer.start();//防止主程序退出System.in.read();} finally {echoServer.close();}} }需要注意處理器的順序:
先說最基本的,讀入數據,執行順序和注冊順序一致 in1 --> in2 ,他們之間通過?ctx.fireChannelRead(msg);進行傳遞。
從EchoServerInHandler1開始執行到EchoServerInHandler2,邏輯處理,進行數據發送返回, 通過ctx.writeAndFlush()就完成從in -->out的轉換。
ctx.writeAndFlush()從當前節點往前查找out類handler,所以就會以 out2----》out1 這樣一種方式執行。
假如下面這種順序, 出站處理器放到后面,這時調用ctx.writeAndFlush()往前找out類型的處理器就會找不到。這種情況要想執行outhandler的處理,應該執行ctx.channel().writeAndFlush();這是從鏈表結尾開始往前查找out類handler,這就是兩種writeAndFlush的區別
?
7.2 客戶端代碼
public class EchoClient {private final int port;private final String host;private Channel channel;private EventLoopGroup group;private Bootstrap b;public EchoClient(String host, int port) {this.host = host;this.port = port;//客戶端啟動輔助類b = new Bootstrap();//構建線程組 處理讀寫group = new NioEventLoopGroup();b.group(group)//指明使用NIO進行網絡通訊.channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//出站處理器 先注冊后執行ch.pipeline().addLast(new EchoClientOutHandler1());ch.pipeline().addLast(new EchoClientOutHandler2());//入站處理器 先注冊先執行ch.pipeline().addLast(new EchoClientInHandler1());ch.pipeline().addLast(new EchoClientInHandler2());}});}public void start() {try {//連接到遠程節點,阻塞等待直到連接完成ChannelFuture f = b.connect(new InetSocketAddress(host, port)).sync();//同步獲取channel(通道,實際上是對socket的封裝支持讀取和寫入)channel = f.sync().channel();} catch (InterruptedException e) {e.printStackTrace();}}/*** 發送消息* @param msg* @return*/public boolean send(String msg) {Message message = new Message();message.setContent(msg);channel.writeAndFlush(message);return true;}/*** 關閉釋放資源*/public void close(){try {if (group != null)group.shutdownGracefully().sync();} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws InterruptedException {EchoClient client = new EchoClient("127.0.0.1", 9999);try {client.start();Scanner scanner = new Scanner(System.in);while (true) {String msg = scanner.next();client.send(msg);}}finally {client.close();}} }public class EchoClientInHandler1 extends ChannelInboundHandlerAdapter{/**讀取數據* @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {//ByteBuf為netty實現的緩沖區ByteBuf in = (ByteBuf)msg;String msgStr = in.toString(CharsetUtil.UTF_8);System.out.println("EchoClientInHandler1 處理:" + msgStr);Message message = JSON.parseObject(msgStr,new TypeReference<Message>(){});ctx.fireChannelRead(message);}/*** 當Channel 處于活動狀態時被調用;Channel 已經連接/綁定并且已經就緒* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("EchoClientInHandler1 channelActive");}/*** exceptionCaught()事件處理方法當出現Throwable對象才會被調用* 即當Netty由于IO錯誤或者處理器在處理事件時拋出的異常時* @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {cause.printStackTrace();ctx.close();} }public class EchoClientInHandler2 extends SimpleChannelInboundHandler<Message> {/*** 客戶端讀取到數據* @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead0(ChannelHandlerContext ctx, Message msg)throws Exception {System.out.println("EchoClientInHandler2 handler:" + msg);} }/*** 該處理器將對象轉為字節 發送到網絡上*/ public class EchoClientOutHandler1 extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("EchoClientOutHandler1 處理:"+msg);Message message = (Message)msg;String messageJson = JSON.toJSONString(message);ByteBuf byteBuf = Unpooled.copiedBuffer(messageJson, CharsetUtil.UTF_8);ctx.write(byteBuf, promise);} }/*** 該處理器給message加上時間戳*/ public class EchoClientOutHandler2 extends ChannelOutboundHandlerAdapter{@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("EchoClientOutHandler2 處理:"+msg);Message message = (Message) msg;message.setTime(System.currentTimeMillis());ctx.write(msg, promise);} }?
啟動服務端和客戶端,然后發送消息
轉載于:https://my.oschina.net/suzheworld/blog/3006174
總結
- 上一篇: Vue 组件间的通讯
- 下一篇: SSM中shiro的基本使用