Netty实现自定义协议
關于協議,使用最為廣泛的是HTTP協議,但是在一些服務交互領域,其使用則相對較少,主要原因有三方面:
-
HTTP協議會攜帶諸如header和cookie等信息,其本身對字節的利用率也較低,這使得HTTP協議比較臃腫,在承載相同信息的情況下,HTTP協議將需要發送更多的數據包;
-
HTTP協議是基于TCP的短連接,其在每次請求和響應的時候都需要進行三次握手和四次揮手,由于服務的交互設計一般都要求能夠承載高并發的請求,因而HTTP協議這種頻繁的握手和揮手動作會極大的影響服務之間交互的效率;
-
服務之間往往有一些根據其自身業務特性所獨有的需求,而HTTP協議無法很好的服務于這些業務需求。
基于上面的原因,一般的服務之間進行交互時都會使用自定義協議,常見的框架,諸如dubbo,kafka,zookeeper都實現了符合其自身業務需求的協議,本文主要講解如何使用Netty實現一款自定義的協議。
1. 協議規定
所謂協議,其本質其實就是定義了一個將數據轉換為字節,或者將字節轉換為數據的一個規范。一款自定義協議,其一般包含兩個部分:消息頭和消息體。消息頭的長度一般是固定的,或者說是可確定的,其定義了此次消息的一些公有信息,比如當前服務的版本,消息的sessionId,消息的類型等等;消息體則主要是此次消息所需要發送的內容,一般在消息頭的最后一定的字節中保存了當前消息的消息體的長度。下面是我們為當前自定義協議所做的一些規定:
上述協議定義中,我們除了定義常用的請求和響應消息類型以外,還定義了Ping和Pong消息。Ping和Pong消息的作用一般是,在服務處于閑置狀態達到一定時長,比如2s時,客戶端服務會向服務端發送一個Ping消息,則會返回一個Pong消息,這樣才表示客戶端與服務端的連接是完好的。如果服務端沒有返回相應的消息,客戶端就會關閉與服務端的連接或者是重新建立與服務端的連接。這樣的優點在于可以防止突然會產生的客戶端與服務端的大量交互。
2. 協議實現
通過上面的定義其實我們可以發現,所謂協議,就是定義了一個規范,基于這個規范,我們可以將消息轉換為相應的字節流,然后經由TCP傳輸到目標服務,目標服務則也基于該規范將字節流轉換為相應的消息,這樣就達到了相互交流的目的。這里面最重要的主要是如何基于該規范將消息轉換為字節流或者將字節流轉換為消息。這一方面,Netty為我們提供了ByteToMessageDecoder和MessageToByteEncoder用于進行消息和字節流的相互轉換。首先我們定義了如下消息實體:
public?class?Message?{private?int?magicNumber;private?byte?mainVersion;private?byte?subVersion;private?byte?modifyVersion;private?String?sessionId;private?MessageTypeEnum?messageType;private?Map<String,?String>?attachments?=?new?HashMap<>();private?String?body;public?Map<String,?String>?getAttachments()?{return?Collections.unmodifiableMap(attachments);}public?void?setAttachments(Map<String,?String>?attachments)?{this.attachments.clear();if?(null?!=?attachments)?{this.attachments.putAll(attachments);}}public?void?addAttachment(String?key,?String?value)?{attachments.put(key,?value);}//?getter?and?setter... }上述消息中,我們將協議中所規定的各個字段都進行了定義,并且定義了一個標志消息類型的枚舉MessageTypeEnum,如下是該枚舉的源碼:
public?enum?MessageTypeEnum?{REQUEST((byte)1),?RESPONSE((byte)2),?PING((byte)3),?PONG((byte)4),?EMPTY((byte)5);private?byte?type;MessageTypeEnum(byte?type)?{this.type?=?type;}public?int?getType()?{return?type;}public?static?MessageTypeEnum?get(byte?type)?{for?(MessageTypeEnum?value?:?values())?{if?(value.type?==?type)?{return?value;}}throw?new?RuntimeException("unsupported?type:?"?+?type);} }上述主要是定義了描述自定義協議相關的實體屬性,對于消息的編碼,本質就是依據上述協議方式將消息實體轉換為字節流,如下是轉換字節流的代碼:
public?class?MessageEncoder?extends?MessageToByteEncoder<Message>?{@Overrideprotected?void?encode(ChannelHandlerContext?ctx,?Message?message,?ByteBuf?out)?{//?這里會判斷消息類型是不是EMPTY類型,如果是EMPTY類型,則表示當前消息不需要寫入到管道中if?(message.getMessageType()?!=?MessageTypeEnum.EMPTY)?{out.writeInt(Constants.MAGIC_NUMBER);?//?寫入當前的魔數out.writeByte(Constants.MAIN_VERSION);?//?寫入當前的主版本號out.writeByte(Constants.SUB_VERSION);?//?寫入當前的次版本號out.writeByte(Constants.MODIFY_VERSION);?//?寫入當前的修訂版本號if?(!StringUtils.hasText(message.getSessionId()))?{//?生成一個sessionId,并將其寫入到字節序列中String?sessionId?=?SessionIdGenerator.generate();message.setSessionId(sessionId);out.writeCharSequence(sessionId,?Charset.defaultCharset());}out.writeByte(message.getMessageType().getType());?//?寫入當前消息的類型out.writeShort(message.getAttachments().size());?//?寫入當前消息的附加參數數量message.getAttachments().forEach((key,?value)?->?{Charset?charset?=?Charset.defaultCharset();out.writeInt(key.length());?//?寫入鍵的長度out.writeCharSequence(key,?charset);?//?寫入鍵數據out.writeInt(value.length());?//?希爾值的長度out.writeCharSequence(value,?charset);?//?寫入值數據});if?(null?==?message.getBody())?{out.writeInt(0);?//?如果消息體為空,則寫入0,表示消息體長度為0}?else?{out.writeInt(message.getBody().length());out.writeCharSequence(message.getBody(),?Charset.defaultCharset());}}} }對于消息的解碼,其過程與上面的消息編碼方式基本一致,主要是基于協議所規定的將字節流數據轉換為消息實體數據。如下是其轉換過程:
public?class?MessageDecoder?extends?ByteToMessageDecoder?{@Overrideprotected?void?decode(ChannelHandlerContext?ctx,?ByteBuf?byteBuf,?List<Object>?out)?throws?Exception?{Message?message?=?new?Message();message.setMagicNumber(byteBuf.readInt());??//?讀取魔數message.setMainVersion(byteBuf.readByte());?//?讀取主版本號message.setSubVersion(byteBuf.readByte());?//?讀取次版本號message.setModifyVersion(byteBuf.readByte());?//?讀取修訂版本號CharSequence?sessionId?=?byteBuf.readCharSequence(Constants.SESSION_ID_LENGTH,?Charset.defaultCharset());?//?讀取sessionIdmessage.setSessionId((String)sessionId);message.setMessageType(MessageTypeEnum.get(byteBuf.readByte()));?//?讀取當前的消息類型short?attachmentSize?=?byteBuf.readShort();?//?讀取附件長度for?(short?i?=?0;?i?<?attachmentSize;?i++)?{int?keyLength?=?byteBuf.readInt();?//?讀取鍵長度和數據CharSequence?key?=?byteBuf.readCharSequence(keyLength,?Charset.defaultCharset());int?valueLength?=?byteBuf.readInt();?//?讀取值長度和數據CharSequence?value?=?byteBuf.readCharSequence(valueLength,?Charset.defaultCharset());message.addAttachment(key.toString(),?value.toString());}int?bodyLength?=?byteBuf.readInt();?//?讀取消息體長度和數據CharSequence?body?=?byteBuf.readCharSequence(bodyLength,?Charset.defaultCharset());message.setBody(body.toString());out.add(message);} }如此,我們自定義消息與字節流的相互轉換工作已經完成。對于消息的處理,主要是要根據消息的不同類型,對消息進行相應的處理,比如對于request類型消息,要寫入響應數據,對于ping消息,要寫入pong消息作為回應。下面我們通過定義Netty handler的方式實現對消息的處理:
//?服務端消息處理器 public?class?ServerMessageHandler?extends?SimpleChannelInboundHandler<Message>?{//?獲取一個消息處理器工廠類實例private?MessageResolverFactory?resolverFactory?=?MessageResolverFactory.getInstance();@Overrideprotected?void?channelRead0(ChannelHandlerContext?ctx,?Message?message)?throws?Exception?{Resolver?resolver?=?resolverFactory.getMessageResolver(message);?//?獲取消息處理器Message?result?=?resolver.resolve(message);?//?對消息進行處理并獲取響應數據ctx.writeAndFlush(result);?//?將響應數據寫入到處理器中}@Overridepublic?void?channelRegistered(ChannelHandlerContext?ctx)?throws?Exception?{resolverFactory.registerResolver(new?RequestMessageResolver());?//?注冊request消息處理器resolverFactory.registerResolver(new?ResponseMessageResolver());//?注冊response消息處理器resolverFactory.registerResolver(new?PingMessageResolver());?//?注冊ping消息處理器resolverFactory.registerResolver(new?PongMessageResolver());?//?注冊pong消息處理器} } //?客戶端消息處理器 public?class?ClientMessageHandler?extends?ServerMessageHandler?{//?創建一個線程,模擬用戶發送消息private?ExecutorService?executor?=?Executors.newSingleThreadExecutor();@Overridepublic?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{//?對于客戶端,在建立連接之后,在一個獨立線程中模擬用戶發送數據給服務端executor.execute(new?MessageSender(ctx));}/***?這里userEventTriggered()主要是在一些用戶事件觸發時被調用,這里我們定義的事件是進行心跳檢測的*?ping和pong消息,當前觸發器會在指定的觸發器指定的時間返回內如果客戶端沒有被讀取消息或者沒有寫入*?消息到管道,則會觸發當前方法*/@Overridepublic?void?userEventTriggered(ChannelHandlerContext?ctx,?Object?evt)?throws?Exception?{if?(evt?instanceof?IdleStateEvent)?{IdleStateEvent?event?=?(IdleStateEvent)?evt;if?(event.state()?==?IdleState.READER_IDLE)?{//?一定時間內,當前服務沒有發生讀取事件,也即沒有消息發送到當前服務來時,//?其會發送一個Ping消息到服務器,以等待其響應Pong消息Message?message?=?new?Message();message.setMessageType(MessageTypeEnum.PING);ctx.writeAndFlush(message);}?else?if?(event.state()?==?IdleState.WRITER_IDLE)?{//?如果當前服務在指定時間內沒有寫入消息到管道,則關閉當前管道ctx.close();}}} ??private?static?final?class?MessageSender?implements?Runnable?{private?static?final?AtomicLong?counter?=?new?AtomicLong(1);private?volatile?ChannelHandlerContext?ctx;public?MessageSender(ChannelHandlerContext?ctx)?{this.ctx?=?ctx;}@Overridepublic?void?run()?{try?{while?(true)?{//?模擬隨機發送消息的過程TimeUnit.SECONDS.sleep(new?Random().nextInt(3));Message?message?=?new?Message();message.setMessageType(MessageTypeEnum.REQUEST);message.setBody("this?is?my?"?+?counter.getAndIncrement()?+?"?message.");message.addAttachment("name",?"xufeng");ctx.writeAndFlush(message);}}?catch?(InterruptedException?e)?{e.printStackTrace();}}} }上述代碼中,由于客戶端和服務端需要處理的消息類型是完全一樣的,因而客戶端處理類繼承了服務端處理類。但是對于客戶端而言,其還需要定時向服務端發送心跳消息,用于檢測客戶端與服務器的連接是否健在,因而客戶端還會實現userEventTriggered()方法,在該方法中定時向服務器發送心跳消息。userEventTriggered()方法主要是在客戶端被閑置一定時間后,其會根據其讀取或者寫入消息的限制時長來選擇性的觸發讀取或寫入事件。
上述實現中,我們看到,對于具體類型消息的處理,我們是通過一個工廠類來獲取對應的消息處理器,然后處理相應的消息,下面我們該工廠類的代碼:
public?final?class?MessageResolverFactory?{//?創建一個工廠類實例private?static?final?MessageResolverFactory?resolverFactory?=?new?MessageResolverFactory();private?static?final?List<Resolver>?resolvers?=?new?CopyOnWriteArrayList<>();private?MessageResolverFactory()?{}//?使用單例模式實例化當前工廠類實例public?static?MessageResolverFactory?getInstance()?{return?resolverFactory;}public?void?registerResolver(Resolver?resolver)?{resolvers.add(resolver);}//?根據解碼后的消息,在工廠類處理器中查找可以處理當前消息的處理器public?Resolver?getMessageResolver(Message?message)?{for?(Resolver?resolver?:?resolvers)?{if?(resolver.support(message))?{return?resolver;}}throw?new?RuntimeException("cannot?find?resolver,?message?type:?"?+?message.getMessageType());}}上述工廠類比較簡單,主要就是通過單例模式獲取一個工廠類實例,然后提供一個根據具體消息來查找其對應的處理器的方法。下面我們來看看各個消息處理器的代碼:
//?request類型的消息 public?class?RequestMessageResolver?implements?Resolver?{private?static?final?AtomicInteger?counter?=?new?AtomicInteger(1);@Overridepublic?boolean?support(Message?message)?{return?message.getMessageType()?==?MessageTypeEnum.REQUEST;}@Overridepublic?Message?resolve(Message?message)?{//?接收到request消息之后,對消息進行處理,這里主要是將其打印出來int?index?=?counter.getAndIncrement();System.out.println("[trx:?"?+?message.getSessionId()?+?"]"+?index?+?".?receive?request:?"?+?message.getBody());System.out.println("[trx:?"?+?message.getSessionId()?+?"]"+?index?+?".?attachments:?"?+?message.getAttachments());//?處理完成后,生成一個響應消息返回Message?response?=?new?Message();response.setMessageType(MessageTypeEnum.RESPONSE);response.setBody("nice?to?meet?you?too!");response.addAttachment("name",?"xufeng");response.addAttachment("hometown",?"wuhan");return?response;} } //?響應消息處理器 public?class?ResponseMessageResolver?implements?Resolver?{private?static?final?AtomicInteger?counter?=?new?AtomicInteger(1);@Overridepublic?boolean?support(Message?message)?{return?message.getMessageType()?==?MessageTypeEnum.RESPONSE;}@Overridepublic?Message?resolve(Message?message)?{//?接收到對方服務的響應消息之后,對響應消息進行處理,這里主要是將其打印出來int?index?=?counter.getAndIncrement();System.out.println("[trx:?"?+?message.getSessionId()?+?"]"+?index?+?".?receive?response:?"?+?message.getBody());System.out.println("[trx:?"?+?message.getSessionId()?+?"]"+?index?+?".?attachments:?"?+?message.getAttachments());//?響應消息不需要向對方服務再發送響應,因而這里寫入一個空消息Message?empty?=?new?Message();empty.setMessageType(MessageTypeEnum.EMPTY);return?empty;} } //?ping消息處理器 public?class?PingMessageResolver?implements?Resolver?{@Overridepublic?boolean?support(Message?message)?{return?message.getMessageType()?==?MessageTypeEnum.PING;}@Overridepublic?Message?resolve(Message?message)?{//?接收到ping消息后,返回一個pong消息返回System.out.println("receive?ping?message:?"?+?System.currentTimeMillis());Message?pong?=?new?Message();pong.setMessageType(MessageTypeEnum.PONG);return?pong;} } //?pong消息處理器 public?class?PongMessageResolver?implements?Resolver?{@Overridepublic?boolean?support(Message?message)?{return?message.getMessageType()?==?MessageTypeEnum.PONG;}@Overridepublic?Message?resolve(Message?message)?{//?接收到pong消息后,不需要進行處理,直接返回一個空的messageSystem.out.println("receive?pong?message:?"?+?System.currentTimeMillis());Message?empty?=?new?Message();empty.setMessageType(MessageTypeEnum.EMPTY);return?empty;} }如此,對于自定義協議的消息處理過程已經完成,下面則是使用用Netty實現的客戶端與服務端代碼:
//?服務端 public?class?Server?{public?static?void?main(String[]?args)?{EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();try?{ServerBootstrap?bootstrap?=?new?ServerBootstrap();bootstrap.group(bossGroup,?workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,?1024).handler(new?LoggingHandler(LogLevel.INFO)).childHandler(new?ChannelInitializer<SocketChannel>()?{@Overrideprotected?void?initChannel(SocketChannel?ch)?throws?Exception?{ChannelPipeline?pipeline?=?ch.pipeline();?//?添加用于處理粘包和拆包問題的處理器pipeline.addLast(new?LengthFieldBasedFrameDecoder(1024,?0,?4,?0,?4));pipeline.addLast(new?LengthFieldPrepender(4));//?添加自定義協議消息的編碼和解碼處理器pipeline.addLast(new?MessageEncoder());pipeline.addLast(new?MessageDecoder());//?添加具體的消息處理器pipeline.addLast(new?ServerMessageHandler());}});ChannelFuture?future?=?bootstrap.bind(8585).sync();future.channel().closeFuture().sync();}?catch?(InterruptedException?e)?{e.printStackTrace();}?finally?{bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}} } public?class?Client?{public?static?void?main(String[]?args)?{NioEventLoopGroup?group?=?new?NioEventLoopGroup();Bootstrap?bootstrap?=?new?Bootstrap();try?{bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,?Boolean.TRUE).handler(new?ChannelInitializer<SocketChannel>()?{@Overrideprotected?void?initChannel(SocketChannel?ch)?throws?Exception?{ChannelPipeline?pipeline?=?ch.pipeline();//?添加用于解決粘包和拆包問題的處理器pipeline.addLast(new?LengthFieldBasedFrameDecoder(1024,?0,?4,?0,?4));pipeline.addLast(new?LengthFieldPrepender(4));//?添加用于進行心跳檢測的處理器pipeline.addLast(new?IdleStateHandler(1,?2,?0));//?添加用于根據自定義協議將消息與字節流進行相互轉換的處理器pipeline.addLast(new?MessageEncoder());pipeline.addLast(new?MessageDecoder());//?添加客戶端消息處理器pipeline.addLast(new?ClientMessageHandler());}});ChannelFuture?future?=?bootstrap.connect("127.0.0.1",?8585).sync();future.channel().closeFuture().sync();}?catch?(InterruptedException?e)?{e.printStackTrace();}?finally?{group.shutdownGracefully();}} }運行上述代碼之后,我們可以看到客戶端和服務器分別打印了如下數據:
// 客戶端 receive pong message: 1555123429356 [trx: d05024d2]1. receive response: nice to meet you too! [trx: d05024d2]1. attachments: {hometown=wuhan, name=xufeng} [trx: 66ee1438]2. receive response: nice to meet you too! // 服務器 receive ping message: 1555123432279 [trx: f582444f]4. receive request: this is my 4 message. [trx: f582444f]4. attachments: {name=xufeng}3. 小結
本文首先將自定義協議與HTTP協議進行了對比,闡述了自定義協議的一些優點。然后定義了一份自定義協議,并且講解了協議中各個字節的含義。最后通過Netty對自定義協議進行了實現,并且實現了基于自定義協議的心跳功能。
總結
以上是生活随笔為你收集整理的Netty实现自定义协议的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数据库里账号的密码,这样存放最安全!
- 下一篇: Java 异常处理中对于 finally