编码与解码 Netty中 提供了 编解码 工具
基本实现
通过继承ByteToMessageDecoder 可以改写对数据的处理流程。
当执行out.add() 时,解码器会直接将添加的对象交给后续的handler 进行执行,这就意味着。每解码出一个对象,后续都会为其使用handler进行一次处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class Byte2IntegerDecoder extends ByteToMessageDecoder { @Override public void decode (ChannelHandlerContext ctx , ByteBuf in, List<Object> out) { while (in.readableBytes()>=4 ){ int i = in.readInt(); Logger.info("解码出一个整数:" + i); out.add(i); } } }public class IntegerProcessHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx,Object msg) ... { Integer integer = (Integer) msg ; Logger.info("打印出一个整数: " + integer); } }public class Byte2IntegerDecoderTester { @Test public void testByteToIntegerDecoder () { ChannelInitializer i = new ChannelInitializer <EmbeddedChannel>(){ protected void initChannel (EmbeddedChannel ch) { ch.pipeline().addLast(new Byte2IntegerDecoder ()); ch.pipeline().addLast(new IntegerProcessHandler ()); } }; EmbeddedChannel channel = new EmbeddedChannel (i); for (int j = 0 ; j< 100 ; j++){ Bytebuf buf = Unpooled.buffer(); buf.writeInt(j); channel.writeInbound(buf); } ... } }
ReplayingDecoder解码器 通过继承ReplayingDecoder , 传入的对象不再需要判断整个对象的长度。
而当长度不足时,会抛出异常被ReplayingDecoder接收,等待后续的数据传入。
1 2 3 4 5 6 7 8 public class Byte2IntegerReplayDecoder extends ReplayingDecoder { @Override public void decode (ChannelHandlerContext ctx, Bytebuf in , List<Object> out) { int i = in.readInt(); Logger.info("解码出一个整数" + i); out.add(i) ; } }
整数的分包解码 通过使用枚举类定义状态信息,将handler 状态初始化为PHASE_1
后续流程执行 :
1.当第一次处理时 状态为PHASE_1 ,读取并将第一个整数存储在handler , 并将状态置为PHASE_2
2.当后续读取就绪时,状态为PHASE_2,执行PHASE_2逻辑,进行求和运算。向后续handler中发送最终结果。并将状态置为PHASE_1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class IntegerAddDecoder extends Replaying <IntegerAddDecoder.PHASE>{ enum PHASE { PHASE_1, PHASE_2 } private int first ; private int second ; public IntegerAddDecoder () { super (PHASE.PHASE_1); } @Override protected void decode (ChannelHandlerContext ctx , Bytebuf in ,List<Object> out) throws Exception{ switch (state()) { case PHASE_1 : first = in.readInt(); checkpoint(PHASE.PHASE_2); break ; case PHASE_2 : second = in.readInt(); Integer sum = first + second ; out.add(sum); checkpoint(PHASE.PHASE_1); break ; default : break ; } } }
字符的分包解码 和 对数值计算时的处理类似,在第一阶段先接收了一个整型 用于描述后续传入的字符串长度,在第二阶段则从传入数据中读取对应的长度解码为字符串。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class StringReplayDecoder extends ReplayingDecoder <StringReplayDecoder.PHASE>{ enum PHASE { PHASE_1 , PHASE_2 } private int length ; private byte [] inBytes ; public StringReplayDecoder () { super (PHASE.PHASE_1) } @Override protected void decode (ChannelHandlerContext ctx , Bytebuf in , List<Object> out ) throws Exception { switch (state()){ case PHASE_1 : length = in.readInt(); inBytes = new byte [length]; checkpoint(PHASE.PHASE_2); break ; case PHASE_2 : in.readBytes(inBytes , 0 ,length); out.add(new String (inBytes, "UTF-8" )); checkpoint(PHASE.PHASE_1); break ; default : break ; } } }
或者使用 ByteToMessageDecoder
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class StringIntegerHeaderDecoder extends ByteToMessageDecoder { @Override protected void decode (ChannelHandlerContext channelHandlerContext , Bytebuf buf , List<Object> out) ...{ if (buf.readableBytes()<4 ){ return ; } buf.markReaderIndex(); int length = buf.readInt(); if (buf.readableBytes() < length){ buf.resetReaderIndex(); return ; } byte [] inBytes = new byte [length]; buf.readBytes(inBytes , 0 , length); out.add(new String (inBytes , "UTF-8" )); } }
MessageToMessageDecoder 通过继承MessageToMessageDecoder 可以完成POJO对象间的 解码转换。
1 2 3 4 5 6 7 public class Integer2StringDecoder extends MessageToMessageDecoder <Integer> { @Override public void decode (ChannelHandlerContext ctx , Integer msg , List<Object> out) ...{ out.add(String.valueOf(msg)); } }
内置解码器 FixedLengthFrameDecoder 固定长度数据解码器 对固定长度的数据进行解码
LineBasedFrameDecoder 行分割数据包解码器 将换行符作为边界分隔符
DelimiterBasedFrameDecoder 可以自定义分隔符 用于作为边界
LengthFieldBasedFrameDecoder 自定义长度数据包解码器 根据原数据包中的数据长度进行解码提取
行分割数据包解码器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class NettyOpenBoxDecoder { static String spliter = "\\r\\n" ; static String content = "string" ; @Test public void testLineBasedFrameDecoder () { ChannelInitializer i = new ChannelInitializer <EmbeddedChannel>(){ protected void initChannel (EmbeddedChannel ch) { ch.pipeline().addLast(new LineBasedFrameDecoder (1024 )); ch.pipeline().addLast(new StringDecoder ()); ch.pipeline().addLast(new StringProcessHandler ()); } }; EmbeddedChannel channel = new EmbeddedChannel (i); for (int j = 0 ; j < 100 ; j++){ int random = RandomUtil.randInMod(3 ); Bytebuf buf = Unpooled.buffer(); for (int k = 0 ; k < random ; k++){ buf.writeBytes(content.getBytes("UTF=8" )); } buf.writeBytes(spliter.getBytes("UTF-8" )); channel.writeInbound(buf); } } }
DelimiterBasedFrameDecoder 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class NettyOpenBoxDecoder { static String spliter = "\\t" ; static String content = "string" ; @Test public void testDelimiterBasedFrameDecoder () { try { final Bytebuf delimiter = Unpooled.copiedBuffer(spliter.getBytes("UTF-8" )); ChannelInitializer i = new ChannelInitializer <EmbeddedChannel>(){ protected void initChannel (EmbeddedChannel ch) { ch.pipeline().addLast(new DelimiterBasedFrameDecoder (1024 ,true ,delimiter)); ch.pipeline().addLast(new StringDecoder ()); ch.pipeline().addLast(new StringProcessHandler ()); } }; ... } } }
LengthFieldBasedFrameDecoder LengthFieldBasedFrameDecoder 需要初始化的值
maxFrameLength 发送的数据包最大长度
lengFieldOffset 长度字段偏移量
lengthFieldLength 长度字段所占字节数
lengthAdjustment 长度的调整值 即在长度 和 实际消息体中间定义的字段长度
initialBytesToStrip 丢弃的初始字节数
通过上述初始化字段的声明
我们得到了 1.包的最大长度 2.长度字段在协议中定义的位置 3.长度字段的长度 4.长度字段后其余字段所占用的长度 5.读取消息体的实际位置。
于是我们可以发现LengthFieldBasedFrameDecoder 最终得到的是一个 长度信息的位置和消息体的实际长度信息。这样便于更好的用于接收和处理实际的消息体
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class NettyOpenBoxDecoder { public static final int VERSION = 100 ; public String content = "String" ; @Test public void testLengthFieldBasedFrameDecoder () { try { final LengthFieldBasedFrameDecoder spliter = new LengthFieldBasedFrameDecoder (1024 ,0 ,4 ,0 ,4 ); ChannelInitializer i = new ChannelInitializer <EmbeddedChannel>(){ protected void initChannel (EmbeddedChannel ch) { ch.pipeline().addLast(spliter); ch.pipeline().addLast(new StringDecoder (Charset.forName("UTF-8" ))); ch.pipeline().addLast(new StringProcessHandler ()); } }; EmbeddedChannel Channel = new EmbeddedChannel (); for (int j = 1 ; j <= 100 ; j++){ ByteBuf buf = Unpooled.buffer(); String s = j + "次发送->" + content; byte [] bytes = s.getBytes("UTF-8" ); buf.writeInt(Bytes.length); buf.writeBytes(bytes); channel.writeInbound(buf); } Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e){ e.printStackTrace(); } catch (UnsupportedEncodingException e){ e.printStackTrace(); } } }
Encoder MessageToByteEncoder 将类型对象转换为字节码
1 2 3 4 5 6 7 public class Integer2ByteEncoder extends MessageToByteEncoder <Integer>{ @Override protected void encode (ChannelHandlerContext ctx , Integer msg , ByteBuf out) ... { out.writeInt(msg); Logger.info("encoder Integer = " + msg); } }
MessageToMessageEncoder 1 2 3 4 5 6 7 8 9 10 11 12 public class String2IntergerEncoder extends MessageToMessageEncoder <String> { @Override protected void encode (ChannelHandlerContext c ,String s , List<Object> list) ...{ char [] array = s.toCharArray(); for (char a : array){ if (a >= 48 && a <= 57 ){ list.add(new Integer (a)); } } } }
编解码结合 通过继承ByteToMessageCodec 重写 encode 和decode 方法完成对编解码的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class Byte2IntegerCodec extends ByteToMessageCodec <Integer>{ @Override public void encode (ChannelHandlerContext ctx , Integer msg , ByteBuf out) ... { out.writeInt(msg); System.out.println("write Integer = " + msg); } @Override public void decode (ChannelHandlerContext ctx , ByteBuf in , List<Object> out) { if (in.readableBytes() >= 4 ){ int i = in.readInt(); System.out.println("Decoder i = " +i); out.add(i); } } }
使用组合其基类 组合定义好的 编码器和解码器
1 2 3 4 5 public class IntegerDuplexHandler extends CombinedChannelDuplexHandler <Byte2IntegerDecoder,Integer2ByteEncoder>{ public IntegerDuplexHandler () { super (new Byte2IntegerDecoder (), new Integer2ByteEncoder ()); } }
序列化对象 包处理 因为TCP被设计为面向连接的流式传输。通过缓冲区来进行包的数据接收和发送。所以在传输过程中,无法通过类似UDP的包收发过程来确定包的完整型。而需要应用层的实现来保证包的完整性。
粘包和半包 粘包 : 接收端接收了一个Bytebuf,包含了发送端的多个ByteBuf,发送端的多个Bytebuf在接收端粘在了一起
半包 : Recevier将Sender的一个Bytebuf拆开了收,收到多个破碎的包,Receiver收到了Sender的一个Bytebuf的一小部分
所以要保证包的完整性,需要用到序列化传输。即通过一些格式化的标记来确定包中字段的范围,逻辑,引用关系。这样在收到包后可以根据已经定义好的规则,将字节数据转换为格式化数据进行处理
常用到的方法包括
自定义的解码器
通过在收发两端定义确定的包长字段和消息体信息,在解码端通过读取包长字段,接受对应长度的消息体。
序列化结构
Json
Protobuf
Json 1 2 3 4 5 6 7 8 9 10 11 12 13 @Data public class JsonMsg { private int id ; private String content ; public String convertToJson () { return JsonUtil.PojoToJson(this ); } public static JsonMsg parseFromJson (String json) { return JsonUtil.jsonToPoJo(json,JsonMsg.class); } }
构建json对象,编解码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class JsonMsgDemo { public class JsonMsgDemo { JsonMsg user = new JsonMsg (); user.setId(1000 ); user.setContent("Something" ); return user ; } }@Test public void setAndDesr () throws IOException { JsonMsg message = buildMsg(); String json = message.convertToJson(); Logger.info("json:=" + json); JsonMsg inMsg = JsonMsg.parseFromJson(json); Logger.info("id=" + inMsg.getId()); Logger.info("content:=" + inMsg.getContent()); }
json的解析步骤仍然是 Head-Content → 将字节码转换为字符串 → 将字符串转换为序列化对象 → 提取对应的值。
案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public class JsonServer { public void runServer () { EventLoopGroup bossLoopGroup = new NioEventLoopGroup (1 ); EventLoopGroup workerLoopGroup = new NioEventLoopGroup (); try { ... b.childHandler(new ChannelInitializer <Socket Channel>(){ protected void initChannel (SocketChannel ch) { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder (1024 ,0 ,4 ,0 ,4 )); ch.pipeline().addLast(new StringDecoder (CharsetUtil.UTF_8)); ch.pipeline().addLast(new JsonMsgDecoder ()); } }); } } static class MessageDecodeer extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) ...{ String json = (String)msg; JsonMsg jsonMsg = JsonMsg.parseFromJson(json); Logger.info("收到一个数据包" + jsonMsg); } } public static void main (String[] args) throws InterruptedException{ int port = NettyDemoConfig.SOCKET_SERVER_PORT ; new JsonServer (port).runServer(); } }public class JsonSendClient { static String content = "Something" ; public void runClient () { EventLoopGroup workerLoopGroup = new NioEventLoopGroup (); try { ... b.handler(new ChannelInitializer <SocketChannel>(){ protected void initChannel (SocketChannel ch) ... { ch.pipeline().addLast(new LengthFieldPrepender (4 )); ch.pipeline().addLast(new StringEncoder (CharsetUtil.UTF_8)); } }); ChannelFuture f = b.connect(); for (int i = 0 ; i < 1000 ; i++){ JsonMsg user = build(i, i+ "->" + content); channel.writeAndFlush(user.convertToJson()); Logger.info("发送报文: " + user.convertToJson()); } channel.flush(); ChannelFuture closeFuture = channel.closeFuture(); closeFuture.sync(); }catch (Exception e){ e.printStackTrace(); }finally { ... } } public JsonMsg build (int id, String content) { JsonMsg user = new JsonMsg (); user.setId(id); user.setContent(content); return user ; } ... }
Protobuf 格式定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 syntax = "proto3" ,option java_package = "com.codfish.netty.protocol" ;option java_outer_classname = "MsgProtos" ; message Msg { uint32 id = 1 ; string content = 2 ; }
通过上述定义格式 定义出对应的.protobuf文件,再通过向maven中安装插件,在执行过程中将所有的protobuf文件,作为POJO类加载到虚拟机中,允许对其调用。
序列化过程 1 .使用toByteArray() 方法将对象转换为字节码数组
1 2 3 4 5 6 7 8 9 10 11 12 13 public class ProtobufDemo { @Test public void serAndDesr1 () throws IOException { MsgProtos.Msg messages = buildMsg(); byte [] data = message.toByteArray(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream (); outputStream.write(data); data = outputStream.toByteArray(); MsgProtos.Msg inMsg = MsgProtos.Msg.parseFrom(data); Logger.info("id:= " + inMsg.getId()); Logger.info("content:= " + inMsg.getContent()); } }
使用方法将POJO对象读出/写入 IO流
1 2 3 4 5 6 7 8 9 10 11 12 public class ProtobufDemo { @Test public void serAndDesr2 () throws IOException{ MsgProtos.Msg message = buildMsg(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream (); message.writeTo(outputStream); ByteArrayInputStream inputStream = new ByteArrayInputStream (outputStream.toByteArray()); MsgProtos.Msg inMsg = MsgProtos.Msg.parseFrom(inputStream); Logger.info("id:= " + inMsg.getId()); Logger.info("content:=" + inMsg.getContent()); } }
使用writeDelimitedTo(OutputStream) 向输出种加入 字节数组长度字段
1 2 3 4 5 6 7 8 9 10 11 public class ProtobufDemo { public void serAndDesr3 () throws IOException{ MsgProtos.Msg message = buildMsg(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream (); message.writeDelimitedTo(outputStream) ; ByteArrayInputStream inputStream = new ByteArrayInputStream (outputStream.toByteArray()); MsgProtos.Msg inMsg = MsgProtos.Msg.parseDelimited(inputStream); Logger.info("id:=" + inMsg.getId()); Logger.info("content:=" + inMsg.getContent()); } }
Protobuf 解码器
ProtobufDecoder 和 ProtobufEncoder 编解码器 提供 字节数组到字符数组的转换过程
ProtobufVarint32LengthFieldPrepender 长度编码器 ,前置一个数组长度值,在编码完成后填充
ProtobufVarint32FrameDecoder 长度解码器
基于protobuf的 C-S 架构案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public class ProtoBufserver { EventLoopGroup bossLoopGroup = new NioEventLoopGroup (); EventLoopGroup workerLoopGroup = new NioEventLoopGroup () ; try { b.childHandler(new ChannelInitializer <SocketChannel>(){ protected void initChannel (SocketChannel ch) { ch.pipeline().addLast(new ProtobufVarint32FrameDecoder ()); ch.pipeline().addLast(new ProtobufDecoder (MsgProtos.Msg.getDefaultInstance())); ch.pipeline().addLast(new ProtobufBussinessDecoder ()); } }); } static class ProtobufBussinessDecoder extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx , Object msg ) ...{ MsgProtos.Msg protoMsg = (MsgProtos.Msg)msg ; Logger.info("收到一个protobuf" ) ; } } }public class ProtoBufSendClient { static String content = "Something" ; public void runClient () { EventLoopGroup workerLoopGroup = new NioEventLoopGroup (); try { b.childHandler(new ChannelInitializer <SocketChannel>(){ protected void initChannel (SocketChannel ch) ...{ ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender ()); ch.pipeline().addLast(new ProtobufEncoder ()); }); ChannelFuture f = b.connect(); f.sync(); Channel channel = f.channel(); for (int i = 0 ; i < 1000 ; i ++ ){ MsgProtos.Msg user = build(i, i + "->" + content); channel.writeAndFlush(user); Logger.info("发送报文数:" + i); } channel.flush(); } public MsgProtos.Msg build (int id ,String content ) { MsgProtos.Msg.Builder builder = MsgProtos.Msg.newBuilder(); builder.setId(id); builder.setContent(content); return builder.build(); } public static void main (String[] args) throws InterruptedException{ int port = NettyDemoConfig.SOCKET_SERVER_PORT; String ip = NettyDemoConfig.SOCKET_SERVER_IP; new ProtoBufSendClient (ip,port).runClient(); } }