Netty 5用户指南
問題
現如今我們使用通用的應用程序或者類庫來實現系統之間地互相訪問,比如我們經常使用一個HTTP客戶端來從web服務器上獲取信息,或者通過web service來執行一個遠程的調用。
然而,有時候一個通用的協議和他的實現并沒有覆蓋一些場景。比如我們無法使用一個通用的HTTP服務器來處理大文件、電子郵件、近實時消息比如財務信息和多人游戲數據。我們需要一個合適的協議來處理一些特殊的場景。例如你可以實現一個優化的Ajax的聊天應用、媒體流傳輸或者是大文件傳輸的HTTP服務器,你甚至可以自己設計和實現一個新的協議來準確地實現你的需求。
?
另外不可避免的事情是你不得不處理這些私有協議來確保和原有系統的互通。這個例子將會展示如何快速實現一個不影響應用程序穩定性和性能的協議。
解決方案
Netty是一個提供異步事件驅動的網絡應用框架,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。
換句話說,Netty是一個NIO框架,使用它可以簡單快速地開發網絡應用程序,比如客戶端和服務端的協議。Netty大大簡化了網絡程序的開發過程比如TCP和UDP的 Socket的開發。
“快速和簡單”并不意味著應用程序會有難維護和性能低的問題,Netty是一個精心設計的框架,它從許多協議的實現中吸收了很多的經驗比如FTP、SMTP、HTTP、許多二進制和基于文本的傳統協議,Netty在不降低開發效率、性能、穩定性、靈活性情況下,成功地找到了解決方案。
有一些用戶可能已經發現其他的一些網絡框架也聲稱自己有同樣的優勢,所以你可能會問是Netty和它們的不同之處。答案就是Netty的哲學設計理念。Netty從第一天開始就為用戶提供了用戶體驗最好的API以及實現設計。正是因為Netty的設計理念,才讓我們得以輕松地閱讀本指南并使用Netty。
入門指南
這個章節會介紹Netty核心的結構,并通過一些簡單的例子來幫助你快速入門。當你讀完本章節你馬上就可以用Netty寫出一個客戶端和服務端。
如果你在學習的時候喜歡“自頂向下(top-down)”的方法,那你可能需要要從第二章《架構概述》開始,然后再回到這里。
開始之前
運行本章節中的兩個例子最低要求是:Netty的最新版本(Netty5)和JDK1.6及以上。最新的Netty版本在項目下載頁面可以找到。為了下載到正確的JDK版本,請到你喜歡的網站下載。
閱讀本章節過程中,你可能會對相關類有疑惑,關于這些類的詳細的信息請請參考API說明文檔。為了方便,所有文檔中涉及到的類名字都會被關聯到一個在線的API說明。當然如果有任何錯誤信息、語法錯誤或者你有任何好的建議來改進文檔說明,那么請聯系Netty社區。
DISCARD服務(丟棄服務,指的是會忽略所有接收的數據的一種協議)
世界上最簡單的協議不是”Hello,World!”,是DISCARD,他是一種丟棄了所有接受到的數據,并不做有任何的響應的協議。
為了實現DISCARD協議,你唯一需要做的就是忽略所有收到的數據。讓我們從處理器的實現開始,處理器是由Netty生成用來處理I/O事件的。
package io.netty.example.discard;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerAdapter;/*** Handles a server-side channel.*/ public class DiscardServerHandler extends ChannelHandlerAdapter { // (1)@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)// Discard the received data silently.((ByteBuf) msg).release(); // (3)}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)// Close the connection when an exception is raised.cause.printStackTrace();ctx.close();} }到目前為止一切都還比較順利,我們已經實現了DISCARD服務的一半功能,剩下的需要編寫一個main()方法來啟動服務端的DiscardServerHandler。
package io.netty.example.discard;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;/*** Discards any incoming data.*/ public class DiscardServer {private int port;public DiscardServer(int port) {this.port = port;}public void run() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap(); // (2)b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3).childHandler(new ChannelInitializer<SocketChannel>() { // (4)@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new DiscardServerHandler());}}).option(ChannelOption.SO_BACKLOG, 128) // (5).childOption(ChannelOption.SO_KEEPALIVE, true); // (6)// Bind and start to accept incoming connections.ChannelFuture f = b.bind(port).sync(); // (7)// Wait until the server socket is closed.// In this example, this does not happen, but you can do that to gracefully// shut down your server.f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {int port;if (args.length > 0) {port = Integer.parseInt(args[0]);} else {port = 8080;}new DiscardServer(port).run();} }恭喜!你已經完成熟練地完成了第一個基于Netty的服務端程序。
觀察接收到的數據
現在我們已經編寫出我們第一個服務端,我們需要測試一下他是否真的可以運行。最簡單的測試方法是用telnet 命令。例如,你可以在命令行上輸入telnet localhost 8080或者其他類型參數。
然而我們能說這個服務端是正常運行了嗎?事實上我們也不知道因為他是一個discard服務,你根本不可能得到任何的響應。為了證明他仍然是在工作的,讓我們修改服務端的程序來打印出他到底接收到了什么。
我們已經知道channelRead()方法是在數據被接收的時候調用。讓我們放一些代碼到DiscardServerHandler類的channelRead()方法。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf in = (ByteBuf) msg;try {while (in.isReadable()) { // (1)System.out.print((char) in.readByte());System.out.flush();}} finally {ReferenceCountUtil.release(msg); // (2)} }如果你再次運行telnet命令,你將會看到服務端打印出了他所接收到的消息。
完整的discard server代碼放在了io.netty.example.discard包下面。
ECHO服務(響應式協議)
到目前為止,我們雖然接收到了數據,但沒有做任何的響應。然而一個服務端通常會對一個請求作出響應。讓我們學習怎樣在ECHO協議的實現下編寫一個響應消息給客戶端,這個協議針對任何接收的數據都會返回一個響應。
和discard server唯一不同的是把在此之前我們實現的channelRead()方法,返回所有的數據替代打印接收數據到控制臺上的邏輯。因此,需要把channelRead()方法修改如下:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.write(msg); // (1)ctx.flush(); // (2) }1.?ChannelHandlerContext對象提供了許多操作,使你能夠觸發各種各樣的I/O事件和操作。這里我們調用了write(Object)方法來逐字地把接受到的消息寫入。請注意不同于DISCARD的例子我們并沒有釋放接受到的消息,這是因為當寫入的時候Netty已經幫我們釋放了。
2. ctx.write(Object)方法不會使消息寫入到通道上,他被緩沖在了內部,你需要調用ctx.flush()方法來把緩沖區中數據強行輸出。或者你可以用更簡潔的cxt.writeAndFlush(msg)以達到同樣的目的。
如果你再一次運行telnet命令,你會看到服務端會發回一個你已經發送的消息。
完整的echo服務的代碼放在了io.netty.example.echo包下面。
TIME服務(時間協議的服務)
在這個部分被實現的協議是TIME協議。和之前的例子不同的是在不接受任何請求時他會發送一個含32位的整數的消息,并且一旦消息發送就會立即關閉連接。在這個例子中,你會學習到如何構建和發送一個消息,然后在完成時主動關閉連接。
因為我們將會忽略任何接收到的數據,而只是在連接被創建發送一個消息,所以這次我們不能使用channelRead()方法了,代替他的是,我們需要覆蓋channelActive()方法,下面的就是實現的內容:
package io.netty.example.time;public class TimeServerHandler extends ChannelHandlerAdapter {@Overridepublic void channelActive(final ChannelHandlerContext ctx) { // (1)final ByteBuf time = ctx.alloc().buffer(4); // (2)time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));final ChannelFuture f = ctx.writeAndFlush(time); // (3)f.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) {assert f == future;ctx.close();}}); // (4)}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();} }因此你需要在write()方法返回的ChannelFuture完成后調用close()方法,然后當他的寫操作已經完成他會通知他的監聽者。請注意,close()方法也可能不會立馬關閉,他也會返回一個ChannelFuture。
為了測試我們的time服務如我們期望的一樣工作,你可以使用UNIX的rdate命令
$ rdate -o <port> -p <host>Port是你在main()函數中指定的端口,host使用locahost就可以了。
Time客戶端
不像DISCARD和ECHO的服務端,對于TIME協議我們需要一個客戶端因為人們不能把一個32位的二進制數據翻譯成一個日期或者日歷。在這一部分,我們將會討論如何確保服務端是正常工作的,并且學習怎樣用Netty編寫一個客戶端。
在Netty中,編寫服務端和客戶端最大的并且唯一不同的使用了不同的BootStrap和Channel的實現。請看一下下面的代碼:
package io.netty.example.time;public class TimeClient {public static void main(String[] args) throws Exception {String host = args[0];int port = Integer.parseInt(args[1]);EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap(); // (1)b.group(workerGroup); // (2)b.channel(NioSocketChannel.class); // (3)b.option(ChannelOption.SO_KEEPALIVE, true); // (4)b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new TimeClientHandler());}});// Start the client.ChannelFuture f = b.connect(host, port).sync(); // (5)// Wait until the connection is closed.f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();}} }正如你看到的,他和服務端的代碼是不一樣的。ChannelHandler是如何實現的?他應該從服務端接受一個32位的整數消息,把他翻譯成人們能讀懂的格式,并打印翻譯好的時間,最后關閉連接:
package io.netty.example.time;import java.util.Date;public class TimeClientHandler extends ChannelHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf m = (ByteBuf) msg; // (1)try {long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;System.out.println(new Date(currentTimeMillis));ctx.close();} finally {m.release();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();} }這樣看起來非常簡單,并且和服務端的那個例子的代碼也相差不多。然而,處理器有時候會因為拋出IndexOutOfBoundsException而拒絕工作。在下個部分我們會討論為什么會發生這種情況。
流數據的傳輸處理
一個小的Socket Buffer問題
在基于流的傳輸里比如TCP/IP,接收到的數據會先被存儲到一個socket接收緩沖里。不幸的是,基于流的傳輸并不是一個數據包隊列,而是一個字節隊列。即使你發送了2個獨立的數據包,操作系統也不會作為2個消息處理而僅僅是作為一連串的字節而言。因此這是不能保證你遠程寫入的數據就會準確地讀取。舉個例子,讓我們假設操作系統的TCP/TP協議棧已經接收了3個數據包:
由于基于流傳輸的協議的這種普通的性質,在你的應用程序里讀取數據的時候會有很高的可能性被分成下面的片段。
因此,一個接收方不管他是客戶端還是服務端,都應該把接收到的數據整理成一個或者多個更有意思并且能夠讓程序的業務邏輯更好理解的數據。在上面的例子中,接收到的數據應該被構造成下面的格式:
第一個解決方案
現在讓我們回到TIME客戶端的例子上。這里我們遇到了同樣的問題,一個32字節數據是非常小的數據量,他并不見得會被經常拆分到到不同的數據段內。然而,問題是他確實可能會被拆分到不同的數據段內,并且拆分的可能性會隨著通信量的增加而增加。
最簡單的方案是構造一個內部的可積累的緩沖,直到4個字節全部接收到了內部緩沖。下面的代碼修改了TimeClientHandler的實現類修復了這個問題
package io.netty.example.time;import java.util.Date;public class TimeClientHandler extends ChannelHandlerAdapter {private ByteBuf buf;@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {buf = ctx.alloc().buffer(4); // (1)}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) {buf.release(); // (1)buf = null;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf m = (ByteBuf) msg;buf.writeBytes(m); // (2)m.release();if (buf.readableBytes() >= 4) { // (3)long currentTimeMillis = (buf.readInt() - 2208988800L) * 1000L;System.out.println(new Date(currentTimeMillis));ctx.close();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();} }第二個解決方案
盡管第一個解決方案已經解決了Time客戶端的問題了,但是修改后的處理器看起來不那么的簡潔,想象一下如果由多個字段比如可變長度的字段組成的更為復雜的協議時,你的ChannelHandler的實現將很快地變得難以維護。
正如你所知的,你可以增加多個ChannelHandler到ChannelPipeline?,因此你可以把一整個ChannelHandler拆分成多個模塊以減少應用的復雜程度,比如你可以把TimeClientHandler拆分成2個處理器:
- TimeDecoder處理數據拆分的問題
- TimeClientHandler原始版本的實現
幸運地是,Netty提供了一個可擴展的類,幫你完成TimeDecoder的開發。
package io.netty.example.time;public class TimeDecoder extends ByteToMessageDecoder { // (1)@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)if (in.readableBytes() < 4) {return; // (3)}out.add(in.readBytes(4)); // (4)} }現在我們有另外一個處理器插入到ChannelPipeline里,我們應該在TimeClient里修改ChannelInitializer?的實現:
b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());} });如果你是一個大膽的人,你可能會嘗試使用更簡單的解碼類ReplayingDecoder。不過你還是需要參考一下API文檔來獲取更多的信息。
public class TimeDecoder extends ReplayingDecoder { @Override protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<object width="300" height="150">out) {out.add(in.readBytes(4));}}此外,Netty還提供了更多可以直接拿來用的解碼器使你可以更簡單地實現更多的協議,幫助你避免開發一個難以維護的處理器實現。請參考下面的包以獲取更多更詳細的例子:
- 對于二進制協議請看io.netty.example.factorial
- 對于基于文本協議請看io.netty.example.telnet
用POJO代替ByteBuf
我們已經討論了所有的例子,到目前為止一個消息的消息都是使用ByteBuf作為一個基本的數據結構。在這一部分,我們會改進TIME協議的客戶端和服務端的例子,用POJO替代ByteBuf。在你的ChannelHandlerS中使用POJO優勢是比較明顯的。通過從ChannelHandler中提取出ByteBuf的代碼,將會使ChannelHandler的實現變得更加可維護和可重用。在TIME客戶端和服務端的例子中,我們讀取的僅僅是一個32位的整形數據,直接使用ByteBuf不會是一個主要的問題。然后,你會發現當你需要實現一個真實的協議,分離代碼變得非常的必要。首先,讓我們定義一個新的類型叫做UnixTime。
package io.netty.example.time;import java.util.Date;public class UnixTime {private final int value;public UnixTime() {this((int) (System.currentTimeMillis() / 1000L + 2208988800L));}public UnixTime(int value) {this.value = value;}public int value() {return value;}@Overridepublic String toString() {return new Date((value() - 2208988800L) * 1000L).toString();} }現在我們可以修改下TimeDecoder類,返回一個UnixTime,以替代ByteBuf
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {if (in.readableBytes() < 4) {return;}out.add(new UnixTime(in.readInt())); }下面是修改后的解碼器,TimeClientHandler不再有任何的ByteBuf代碼了。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) {UnixTime m = (UnixTime) msg;System.out.println(m);ctx.close(); }是不是變得更加簡單和優雅了?相同的技術可以被運用到服務端。讓我們修改一下TimeServerHandler的代碼。
@Override public void channelActive(ChannelHandlerContext ctx) {ChannelFuture f = ctx.writeAndFlush(new UnixTime());f.addListener(ChannelFutureListener.CLOSE); }現在,僅僅需要修改的是ChannelHandler的實現,這里需要把UnixTime對象重新轉化為一個ByteBuf。不過這已經是非常簡單了,因為當你對一個消息編碼的時候,你不需要再處理拆包和組裝的過程。
package io.netty.example.time;public class TimeEncoder extends ChannelHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {UnixTime m = (UnixTime) msg;ByteBuf encoded = ctx.alloc().buffer(4);encoded.writeInt(m.value());ctx.write(encoded, promise); // (1)} }進一步簡化操作,你可以使用MessageToByteEncode:
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {@Overrideprotected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {out.writeInt(msg.value());} }最后的任務就是在TimeServerHandler之前把TimeEncoder插入到ChannelPipeline。但這是不那么重要的工作。
關閉你的應用
關閉一個Netty應用往往只需要簡單地通過shutdownGracefully()方法來關閉你構建的所有的NioEventLoopGroupS.當EventLoopGroup被完全地終止,并且對應的所有channels都已經被關閉時,Netty會返回一個Future對象。
概述
在這一章節中,我們會快速地回顧下如果在熟練掌握Netty的情況下編寫出一個健壯能運行的網絡應用程序。在Netty接下去的章節中還會有更多更相信的信息。我們也鼓勵你去重新復習下在io.netty.example包下的例子。請注意社區一直在等待你的問題和想法以幫助Netty的持續改進,Netty的文檔也是基于你們的快速反饋上。
總結
以上是生活随笔為你收集整理的Netty 5用户指南的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SaaS 中 6 种常见 UI 入职模式
- 下一篇: 【动态规划】关于转移方程的简单理解