netty初探(2)
上一篇 netty(1)
一、TCP/IP 流式傳輸
在上文演示了2進制流式傳輸引起的TCP拆包問題,這里繼續演示文本型的傳輸問題,文本型的可以有以下幾種策略
1.1 以特殊字符表示結尾
HTTP協議中以\r\n\r\n表示請求首部結束,這里也以\r\n\r\n表示特殊字符,非常容易理解,沒有碰到\r\n\r\n就繼續寫入緩沖,碰到了表明是一個完整的邏輯數據,可以處理了。
Server端代碼
public class Server {public static final String SERVER_DELIMITER = "\r\n\r\n";public static void main(String[] args) throws Exception {//1 創建2個線程,一個是負責接收客戶端的連接。一個是負責進行數據傳輸的EventLoopGroup pGroup = new NioEventLoopGroup();EventLoopGroup cGroup = new NioEventLoopGroup();//2 創建服務器輔助類ServerBootstrap b = new ServerBootstrap();b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_SNDBUF, 32 * 1024).option(ChannelOption.SO_RCVBUF, 32 * 1024).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sc) throws Exception {//設置特殊分隔符ByteBuf buf = Unpooled.copiedBuffer(SERVER_DELIMITER.getBytes());sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));//設置字符串形式的解碼sc.pipeline().addLast(new StringDecoder());sc.pipeline().addLast(new ServerHandler());}});//4 綁定連接ChannelFuture cf = b.bind(8765).sync();//等待服務器監聽端口關閉 cf.channel().closeFuture().sync();pGroup.shutdownGracefully();cGroup.shutdownGracefully();}} public class ServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println(" server channel active... ");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {String request = (String)msg;System.out.println("Server :" + msg);String response = "服務器響應:" + msg + Server.SERVER_DELIMITER;ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {ctx.close();} }客戶端代碼:
public class Client {public static final String CLIENT_DELIMITER = "\r\n\r\n";public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup();Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sc) throws Exception {// ByteBuf buf = Unpooled.copiedBuffer(CLIENT_DELIMITER.getBytes());sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));sc.pipeline().addLast(new StringDecoder());sc.pipeline().addLast(new ClientHandler());}});ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();StringBuffer sb = new StringBuffer("GET " + "/index.jsp" + " HTTP/1.1\r\n");sb.append("Host: www.javathinker.org\r\n");sb.append("Accept: */*\r\n");sb.append("Accept-Language: zh-cn\r\n");sb.append("Accept-Encoding: gzip, deflate\r\n");sb.append("User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.0)\r\n");sb.append("Connection: Keep-Alive\r\n\r\n");cf.channel().writeAndFlush(Unpooled.wrappedBuffer(sb.toString().getBytes())); // Thread.sleep(1000); // cf.addListener(ChannelFutureListener.CLOSE);//等待客戶端端口關閉 cf.channel().closeFuture().sync();group.shutdownGracefully();} } public class ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("client channel active... ");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {try {String response = (String)msg;System.out.println("Client: " + response);} finally {ReferenceCountUtil.release(msg);}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();} }?1.2 以FixedLength的方式
跟上述代碼類似,只需要修改下Decoder即可
b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sc) throws Exception {sc.pipeline().addLast(new FixedLengthFrameDecoder(5));sc.pipeline().addLast(new StringDecoder());sc.pipeline().addLast(new ClientHandler());}});二、使用Java對象傳輸
官網Object demo:http://netty.io/4.1/xref/io/netty/example/objectecho/package-summary.html
使用Java對象傳輸,只需要配置對應的encoder: Object->byte[] 和對應的decoder: byte[]->Object即可
2.1 使用java searilizable
先寫個工具類,提供GZIP壓縮功能,以及發送請求和處理請求的通用方法
package netty;import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import netty.marshalling.Req; import netty.marshalling.Resp;import java.io.*; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream;/*** Created by carl.yu on 2016/11/7.*/ public class Utils {public static byte[] gzip(byte[] data) throws Exception{ByteArrayOutputStream bos = new ByteArrayOutputStream();GZIPOutputStream gzip = new GZIPOutputStream(bos);gzip.write(data);gzip.finish();gzip.close();byte[] ret = bos.toByteArray();bos.close();return ret;}public static byte[] ungzip(byte[] data) throws Exception{ByteArrayInputStream bis = new ByteArrayInputStream(data);GZIPInputStream gzip = new GZIPInputStream(bis);byte[] buf = new byte[1024];int num = -1;ByteArrayOutputStream bos = new ByteArrayOutputStream();while((num = gzip.read(buf, 0 , buf.length)) != -1 ){bos.write(buf, 0, num);}gzip.close();bis.close();byte[] ret = bos.toByteArray();bos.flush();bos.close();return ret;}public static void send(ChannelFuture cf) throws Exception{for (int i = 1; i <= 2; i++) {Req req = new Req();req.setId("" + i);req.setName("pro" + i);req.setRequestMessage("數據信息" + i);String path = "G:\\projects-helloworld\\lucene\\src\\main\\resources\\in\\" + i + ".jpg";String fileName = i + ".jpg";req.setFileName(fileName);File file = new File(path);FileInputStream in = new FileInputStream(file);byte[] data = new byte[in.available()];in.read(data);in.close();req.setAttachment(Utils.gzip(data));cf.channel().writeAndFlush(req);}}public static void recv(ChannelHandlerContext ctx, Object msg) throws Exception{Req req = (Req) msg;System.out.println("req:"+req);System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage());byte[] attachment = Utils.ungzip(req.getAttachment());String fileName = req.getFileName();String path = "G:\\projects-helloworld\\lucene\\src\\main\\resources\\out\\" + fileName;FileOutputStream fos = new FileOutputStream(path);fos.write(attachment);fos.close();Resp resp = new Resp();resp.setId(req.getId());resp.setName("resp" + req.getId());resp.setResponseMessage("響應內容" + req.getId());ctx.writeAndFlush(resp);//.addListener(ChannelFutureListener.CLOSE); }public static void main(String[] args) throws Exception{//讀取文件String readPath = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "006.jpg";File file = new File(readPath);FileInputStream in = new FileInputStream(file);byte[] data = new byte[in.available()];in.read(data);in.close();System.out.println("文件原始大小:" + data.length);//測試壓縮byte[] ret1 = gzip(data);System.out.println("壓縮之后大小:" + ret1.length);byte[] ret2 = ungzip(ret1);System.out.println("還原之后大小:" + ret2.length);//寫出文件String writePath = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "006.jpg";FileOutputStream fos = new FileOutputStream(writePath);fos.write(ret2);fos.close();} }?
?Req:
import java.io.Serializable;public class Req implements Serializable {private static final long SerialVersionUID = 1L;private String id;private String name;private String requestMessage;private byte[] attachment;private String fileName;public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getRequestMessage() {return requestMessage;}public void setRequestMessage(String requestMessage) {this.requestMessage = requestMessage;}public byte[] getAttachment() {return attachment;}public void setAttachment(byte[] attachment) {this.attachment = attachment;}public String getFileName() {return fileName;}public void setFileName(String fileName) {this.fileName = fileName;}@Overridepublic String toString() {return "Req{" +"id='" + id + '\'' +", name='" + name + '\'' +", requestMessage='" + requestMessage + '\'' +", fileName='" + fileName + '\'' +'}';} }Resp:
import java.io.Serializable;public class Resp implements Serializable{private static final long serialVersionUID = 1L;private String id = "1";private String name ="aaa";private String responseMessage ="this is demo";public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getResponseMessage() {return responseMessage;}public void setResponseMessage(String responseMessage) {this.responseMessage = responseMessage;}@Overridepublic String toString() {return "Resp{" +"id='" + id + '\'' +", name='" + name + '\'' +", responseMessage='" + responseMessage + '\'' +'}';} }Server端代碼:
public class ObjectEchoServer {static final boolean SSL = System.getProperty("ssl") != null;static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));public static void main(String[] args) throws Exception {// Configure SSL.final SslContext sslCtx;if (SSL) {SelfSignedCertificate ssc = new SelfSignedCertificate();sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();} else {sslCtx = null;}EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc()));}p.addLast(new ObjectEncoder(),new ObjectDecoder(ClassResolvers.cacheDisabled(null)),new ObjectEchoServerHandler());}});// Bind and start to accept incoming connections. b.bind(PORT).sync().channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}} } public class ObjectEchoServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// Echo back the received object to the client. Utils.recv(ctx, msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();} }client端代碼:
public final class ObjectEchoClient {static final boolean SSL = System.getProperty("ssl") != null;static final String HOST = System.getProperty("host", "127.0.0.1");static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));public static void main(String[] args) throws Exception {// Configure SSL.final SslContext sslCtx;if (SSL) {sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();} else {sslCtx = null;}EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));}p.addLast(new ObjectEncoder(),new ObjectDecoder(ClassResolvers.cacheDisabled(null)),new ObjectEchoClientHandler());}});// Start the connection attempt.ChannelFuture cf = b.connect(HOST, PORT).sync();Utils.send(cf);cf.channel().closeFuture().sync();} finally {group.shutdownGracefully();}} } public class ObjectEchoClientHandler extends ChannelInboundHandlerAdapter {private final List<Integer> firstMessage;/*** Creates a client-side handler.*/public ObjectEchoClientHandler() {firstMessage = new ArrayList<Integer>(ObjectEchoClient.SIZE);for (int i = 0; i < ObjectEchoClient.SIZE; i++) {firstMessage.add(Integer.valueOf(i));}}@Overridepublic void channelActive(ChannelHandlerContext ctx) {}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {try {Resp resp = (Resp)msg;System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());} finally {ReferenceCountUtil.release(msg);}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();} }2.2 使用jboss marshalling序列化
<dependency><groupId>org.jboss.marshalling</groupId><artifactId>jboss-marshalling-serial</artifactId><version>2.0.0.Beta2</version></dependency>jboss marshalling自動支持netty的codec
public final class MarshallingCodeCFactory {/*** 創建Jboss Marshalling解碼器MarshallingDecoder* @return MarshallingDecoder*/public static MarshallingDecoder buildMarshallingDecoder() {//首先通過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識創建的是java序列化工廠對象。final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");//創建了MarshallingConfiguration對象,配置了版本號為5 final MarshallingConfiguration configuration = new MarshallingConfiguration();configuration.setVersion(5);//根據marshallerFactory和configuration創建providerUnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);//構建Netty的MarshallingDecoder對象,倆個參數分別為provider和單個消息序列化后的最大長度MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);return decoder;}/*** 創建Jboss Marshalling編碼器MarshallingEncoder* @return MarshallingEncoder*/public static MarshallingEncoder buildMarshallingEncoder() {final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");final MarshallingConfiguration configuration = new MarshallingConfiguration();configuration.setVersion(5);MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);//構建Netty的MarshallingEncoder對象,MarshallingEncoder用于實現序列化接口的POJO對象序列化為二進制數組MarshallingEncoder encoder = new MarshallingEncoder(provider);return encoder;} }?
直接在server中配置codec即可,其他都類似
b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)//設置日志.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel sc) throws Exception {sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());sc.pipeline().addLast(new ServerHandler());}}); b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sc) throws Exception {sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());sc.pipeline().addLast(new ClientHandler());}});2.3 其他序列化框架
其他序列化框架都類似,只要有對應的encoder和decoder,比如Kryo和Google protocol Buf
?
三、其他
3.1 read_time_out和write_time_out
在用netty進行socket通信時,通常也會遇到read time out和write time out的設置問題,netty也是通過handler來實現的,netty默認提供了2個類
// The connection is closed when there is no inbound traffic// for 30 seconds.public class MyChannelInitializer extends ChannelInitializer<Channel> {public void initChannel(Channel channel) {channel.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(30);channel.pipeline().addLast("myHandler", new MyHandler());}}// Handler should handle the ReadTimeoutException.public class MyHandler extends ChannelDuplexHandler {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {if (cause instanceof ReadTimeoutException) {// do something} else {super.exceptionCaught(ctx, cause);}}}ServerBootstrap bootstrap = ...;...bootstrap.childHandler(new MyChannelInitializer());...當讀超時,可以捕獲該異常,也可以丟棄此連接,防止占用服務器資源。
同樣的
// The connection is closed when a write operation cannot finish in 30 seconds.public class MyChannelInitializer extends ChannelInitializer<Channel> {public void initChannel(Channel channel) {channel.pipeline().addLast("writeTimeoutHandler", new WriteTimeoutHandler(30);channel.pipeline().addLast("myHandler", new MyHandler());}}// Handler should handle the WriteTimeoutException.public class MyHandler extends ChannelDuplexHandler {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {if (cause instanceof WriteTimeoutException) {// do something} else {super.exceptionCaught(ctx, cause);}}}ServerBootstrap bootstrap = ...;...bootstrap.childHandler(new MyChannelInitializer());...?
轉載于:https://www.cnblogs.com/carl10086/p/6039593.html
總結
以上是生活随笔為你收集整理的netty初探(2)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JAVA 实现扫码二维码登录
- 下一篇: Webwork 学习之路【08】结合实战