Java编程 VIIII 读取与序列化

编码与解码

Netty中 提供了 编解码 工具

基本实现

通过继承ByteToMessageDecoder 可以改写对数据的处理流程。

当执行out.add() 时,解码器会直接将添加的对象交给后续的handler 进行执行,这就意味着。每解码出一个对象,后续都会为其使用handler进行一次处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Byte2IntegerDecoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx , ByteBuf in, List<Object> out){
while (in.readableBytes()>=4){
int i = in.readInt();
Logger.info("解码出一个整数:" + i);
out.add(i);
}
}
}

public class IntegerProcessHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg)... {
Integer integer = (Integer) msg ;
Logger.info("打印出一个整数: " + integer);

}

}

public class Byte2IntegerDecoderTester{
@Test
public void testByteToIntegerDecoder() {
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>(){
protected void initChannel(EmbeddedChannel ch){
ch.pipeline().addLast(new Byte2IntegerDecoder());
ch.pipeline().addLast(new IntegerProcessHandler());
}
};
EmbeddedChannel channel = new EmbeddedChannel(i);
for(int j = 0 ; j< 100 ; j++){
Bytebuf buf = Unpooled.buffer();
buf.writeInt(j);
channel.writeInbound(buf);
}
...
}
}

ReplayingDecoder解码器

通过继承ReplayingDecoder , 传入的对象不再需要判断整个对象的长度。

而当长度不足时,会抛出异常被ReplayingDecoder接收,等待后续的数据传入。

1
2
3
4
5
6
7
8
public class Byte2IntegerReplayDecoder extends ReplayingDecoder{
@Override
public void decode(ChannelHandlerContext ctx, Bytebuf in , List<Object> out){
int i = in.readInt();
Logger.info("解码出一个整数" + i);
out.add(i) ;
}
}

整数的分包解码

通过使用枚举类定义状态信息,将handler 状态初始化为PHASE_1

后续流程执行 :

1.当第一次处理时 状态为PHASE_1 ,读取并将第一个整数存储在handler , 并将状态置为PHASE_2

2.当后续读取就绪时,状态为PHASE_2,执行PHASE_2逻辑,进行求和运算。向后续handler中发送最终结果。并将状态置为PHASE_1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class IntegerAddDecoder extends Replaying<IntegerAddDecoder.PHASE>{
enum PHASE {
PHASE_1,
PHASE_2
}
private int first ;
private int second ;
public IntegerAddDecoder()
{
super(PHASE.PHASE_1);
}
@Override
protected void decode(ChannelHandlerContext ctx , Bytebuf in ,List<Object> out) throws Exception{
switch (state())
{
case PHASE_1 :
first = in.readInt();
checkpoint(PHASE.PHASE_2);
break ;

case PHASE_2 :
second = in.readInt();
Integer sum = first + second ;
out.add(sum);
checkpoint(PHASE.PHASE_1);
break;
default :
break ;
}
}
}

字符的分包解码

和 对数值计算时的处理类似,在第一阶段先接收了一个整型 用于描述后续传入的字符串长度,在第二阶段则从传入数据中读取对应的长度解码为字符串。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class StringReplayDecoder extends ReplayingDecoder<StringReplayDecoder.PHASE>{
enum PHASE {
PHASE_1 ,
PHASE_2
}
private int length ;
private byte[] inBytes ;
public StringReplayDecoder(){
super(PHASE.PHASE_1)
}
@Override
protected void decode(ChannelHandlerContext ctx , Bytebuf in , List<Object> out ) throws Exception {
switch(state()){
case PHASE_1 :
length = in.readInt();
inBytes = new byte[length];
checkpoint(PHASE.PHASE_2);
break ;
case PHASE_2 :
in.readBytes(inBytes , 0 ,length);
out.add(new String(inBytes, "UTF-8"));
checkpoint(PHASE.PHASE_1);
break ;
default :
break ;
}
}
}

或者使用 ByteToMessageDecoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class StringIntegerHeaderDecoder extends ByteToMessageDecoder{
@Override
protected void decode(ChannelHandlerContext channelHandlerContext , Bytebuf buf , List<Object> out)...{
if(buf.readableBytes()<4){
return;
}
buf.markReaderIndex();
int length = buf.readInt();
if(buf.readableBytes() < length){
buf.resetReaderIndex();
return ;
}
byte[] inBytes = new byte[length];
buf.readBytes(inBytes , 0 , length);
out.add(new String(inBytes , "UTF-8"));
}
}

MessageToMessageDecoder

通过继承MessageToMessageDecoder 可以完成POJO对象间的 解码转换。

1
2
3
4
5
6
7
//定义
public class Integer2StringDecoder extends MessageToMessageDecoder<Integer> {
@Override
public void decode(ChannelHandlerContext ctx , Integer msg , List<Object> out)...{
out.add(String.valueOf(msg));
}
}

内置解码器

FixedLengthFrameDecoder 固定长度数据解码器 对固定长度的数据进行解码

LineBasedFrameDecoder 行分割数据包解码器 将换行符作为边界分隔符

DelimiterBasedFrameDecoder 可以自定义分隔符 用于作为边界

LengthFieldBasedFrameDecoder 自定义长度数据包解码器 根据原数据包中的数据长度进行解码提取

行分割数据包解码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class NettyOpenBoxDecoder{
static String spliter = "\\r\\n";
static String content = "string";
@Test
public void testLineBasedFrameDecoder(){
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>(){
protected void initChannel(EmbeddedChannel ch){
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringProcessHandler());
}
};
EmbeddedChannel channel = new EmbeddedChannel(i);
for (int j = 0 ; j < 100 ; j++){
int random = RandomUtil.randInMod(3);
Bytebuf buf = Unpooled.buffer();
for(int k = 0 ; k < random ; k++){
buf.writeBytes(content.getBytes("UTF=8"));
}
buf.writeBytes(spliter.getBytes("UTF-8"));
channel.writeInbound(buf);
}
}
}

DelimiterBasedFrameDecoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class NettyOpenBoxDecoder{
static String spliter = "\\t";
static String content = "string";
@Test
public void testDelimiterBasedFrameDecoder(){
try {
final Bytebuf delimiter = Unpooled.copiedBuffer(spliter.getBytes("UTF-8"));
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>(){
protected void initChannel(EmbeddedChannel ch){
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,true,delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringProcessHandler());
}
};
...
}
}
}

LengthFieldBasedFrameDecoder

LengthFieldBasedFrameDecoder 需要初始化的值

  1. maxFrameLength 发送的数据包最大长度
  2. lengFieldOffset 长度字段偏移量
  3. lengthFieldLength 长度字段所占字节数
  4. lengthAdjustment 长度的调整值 即在长度 和 实际消息体中间定义的字段长度
  5. initialBytesToStrip 丢弃的初始字节数

通过上述初始化字段的声明

我们得到了 1.包的最大长度 2.长度字段在协议中定义的位置 3.长度字段的长度 4.长度字段后其余字段所占用的长度 5.读取消息体的实际位置。

于是我们可以发现LengthFieldBasedFrameDecoder 最终得到的是一个 长度信息的位置和消息体的实际长度信息。这样便于更好的用于接收和处理实际的消息体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class NettyOpenBoxDecoder{
public static final int VERSION = 100 ;
public String content = "String";
@Test
public void testLengthFieldBasedFrameDecoder(){
try {
final LengthFieldBasedFrameDecoder spliter = new LengthFieldBasedFrameDecoder(1024,0,4,0,4);
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>(){
protected void initChannel(EmbeddedChannel ch){
ch.pipeline().addLast(spliter);
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new StringProcessHandler());
}
};
EmbeddedChannel Channel = new EmbeddedChannel();
for (int j = 1 ; j <= 100 ; j++){
ByteBuf buf = Unpooled.buffer();
String s = j + "次发送->" + content;
byte[] bytes = s.getBytes("UTF-8");
buf.writeInt(Bytes.length);
buf.writeBytes(bytes);
channel.writeInbound(buf);
}
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e){
e.printStackTrace();
} catch (UnsupportedEncodingException e){
e.printStackTrace();
}
}
}

Encoder

MessageToByteEncoder

将类型对象转换为字节码

1
2
3
4
5
6
7
public class Integer2ByteEncoder extends MessageToByteEncoder<Integer>{
@Override
protected void encode(ChannelHandlerContext ctx , Integer msg , ByteBuf out) ... {
out.writeInt(msg);
Logger.info("encoder Integer = " + msg);
}
}

MessageToMessageEncoder

1
2
3
4
5
6
7
8
9
10
11
12
public class String2IntergerEncoder extends MessageToMessageEncoder<String> {
@Override
protected void encode(ChannelHandlerContext c ,String s , List<Object> list)...{
char[] array = s.toCharArray();
for (char a : array){
if (a >= 48 && a <= 57){
list.add(new Integer(a));
}
}
}

}

编解码结合

通过继承ByteToMessageCodec 重写 encode 和decode 方法完成对编解码的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Byte2IntegerCodec extends ByteToMessageCodec<Integer>{
@Override
public void encode(ChannelHandlerContext ctx , Integer msg , ByteBuf out)... {
out.writeInt(msg);
System.out.println("write Integer = " + msg);

}
@Override
public void decode(ChannelHandlerContext ctx , ByteBuf in , List<Object> out){
if(in.readableBytes() >= 4){
int i = in.readInt();
System.out.println("Decoder i = "+i);
out.add(i);
}
}
}

使用组合其基类 组合定义好的 编码器和解码器

1
2
3
4
5
public class IntegerDuplexHandler extends CombinedChannelDuplexHandler<Byte2IntegerDecoder,Integer2ByteEncoder>{
public IntegerDuplexHandler(){
super(new Byte2IntegerDecoder(), new Integer2ByteEncoder());
}
}

序列化对象

包处理

因为TCP被设计为面向连接的流式传输。通过缓冲区来进行包的数据接收和发送。所以在传输过程中,无法通过类似UDP的包收发过程来确定包的完整型。而需要应用层的实现来保证包的完整性。

粘包和半包

粘包 : 接收端接收了一个Bytebuf,包含了发送端的多个ByteBuf,发送端的多个Bytebuf在接收端粘在了一起

半包 : Recevier将Sender的一个Bytebuf拆开了收,收到多个破碎的包,Receiver收到了Sender的一个Bytebuf的一小部分

所以要保证包的完整性,需要用到序列化传输。即通过一些格式化的标记来确定包中字段的范围,逻辑,引用关系。这样在收到包后可以根据已经定义好的规则,将字节数据转换为格式化数据进行处理

常用到的方法包括

自定义的解码器

通过在收发两端定义确定的包长字段和消息体信息,在解码端通过读取包长字段,接受对应长度的消息体。

序列化结构

Json

Protobuf

Json

1
2
3
4
5
6
7
8
9
10
11
12
13
@Data
public class JsonMsg{
private int id ;
private String content ;

public String convertToJson(){
return JsonUtil.PojoToJson(this);
}

public static JsonMsg parseFromJson(String json){
return JsonUtil.jsonToPoJo(json,JsonMsg.class);
}
}

构建json对象,编解码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class JsonMsgDemo{
public class JsonMsgDemo{
JsonMsg user = new JsonMsg();
user.setId(1000);
user.setContent("Something");
return user ;
}
}
@Test
public void setAndDesr() throws IOException {
JsonMsg message = buildMsg();
String json = message.convertToJson();
Logger.info("json:=" + json);
JsonMsg inMsg = JsonMsg.parseFromJson(json);
Logger.info("id=" + inMsg.getId());
Logger.info("content:=" + inMsg.getContent());
}

json的解析步骤仍然是 Head-Content → 将字节码转换为字符串 → 将字符串转换为序列化对象 → 提取对应的值。

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// 服务端
public class JsonServer {
public void runServer(){
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
...
b.childHandler(new ChannelInitializer<Socket Channel>(){
protected void initChannel(SocketChannel ch){
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,0,4,0,4));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new JsonMsgDecoder());
}
});
}
}
static class MessageDecodeer extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)...{
String json =(String)msg;
JsonMsg jsonMsg = JsonMsg.parseFromJson(json);
Logger.info("收到一个数据包" + jsonMsg);
}
}

public static void main(String[] args) throws InterruptedException{
int port = NettyDemoConfig.SOCKET_SERVER_PORT ;
new JsonServer(port).runServer();
}

}

// 客户端

public class JsonSendClient {
static String content = "Something";
public void runClient(){
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
...
b.handler(new ChannelInitializer<SocketChannel>(){
protected void initChannel(SocketChannel ch)... {
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
}
});
ChannelFuture f = b.connect();
for(int i = 0 ; i < 1000 ; i++){
JsonMsg user = build(i, i+ "->" + content);
channel.writeAndFlush(user.convertToJson());
Logger.info("发送报文: "+ user.convertToJson());
}
channel.flush();
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
}catch (Exception e){
e.printStackTrace();
}finally{
...
}
}
public JsonMsg build(int id, String content){
JsonMsg user = new JsonMsg();
user.setId(id);
user.setContent(content);
return user ;
}
...
}

Protobuf

格式定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 格式定义
// [开始头部声明]
syntax = "proto3",
// [头部声明结束]

// [开始Java选项配置]
option java_package = "com.codfish.netty.protocol";
option java_outer_classname = "MsgProtos";
// [Java选项配置结束]

// [开始消息定义]
message Msg {
uint32 id = 1 ;
string content = 2 ;
}
// [消息定义结束]

通过上述定义格式 定义出对应的.protobuf文件,再通过向maven中安装插件,在执行过程中将所有的protobuf文件,作为POJO类加载到虚拟机中,允许对其调用。

序列化过程

1 .使用toByteArray() 方法将对象转换为字节码数组

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ProtobufDemo {
@Test
public void serAndDesr1() throws IOException {
MsgProtos.Msg messages = buildMsg();
byte[] data = message.toByteArray();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
outputStream.write(data);
data = outputStream.toByteArray();
MsgProtos.Msg inMsg = MsgProtos.Msg.parseFrom(data);
Logger.info("id:= " + inMsg.getId());
Logger.info("content:= " + inMsg.getContent());
}
}
  1. 使用方法将POJO对象读出/写入 IO流
1
2
3
4
5
6
7
8
9
10
11
12
public class ProtobufDemo{
@Test
public void serAndDesr2() throws IOException{
MsgProtos.Msg message = buildMsg();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
message.writeTo(outputStream);
ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
MsgProtos.Msg inMsg = MsgProtos.Msg.parseFrom(inputStream);
Logger.info("id:= " + inMsg.getId());
Logger.info("content:=" + inMsg.getContent());
}
}
  1. 使用writeDelimitedTo(OutputStream) 向输出种加入 字节数组长度字段
1
2
3
4
5
6
7
8
9
10
11
public class ProtobufDemo{
public void serAndDesr3() throws IOException{
MsgProtos.Msg message = buildMsg();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
message.writeDelimitedTo(outputStream) ;
ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
MsgProtos.Msg inMsg = MsgProtos.Msg.parseDelimited(inputStream);
Logger.info("id:=" + inMsg.getId());
Logger.info("content:=" + inMsg.getContent());
}
}

Protobuf 解码器

  1. ProtobufDecoder 和 ProtobufEncoder 编解码器 提供 字节数组到字符数组的转换过程
  2. ProtobufVarint32LengthFieldPrepender 长度编码器 ,前置一个数组长度值,在编码完成后填充
  3. ProtobufVarint32FrameDecoder 长度解码器

基于protobuf的 C-S 架构案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 服务端
public class ProtoBufserver{
EventLoopGroup bossLoopGroup = new NioEventLoopGroup();
EventLoopGroup workerLoopGroup = new NioEventLoopGroup() ;
try {
b.childHandler(new ChannelInitializer<SocketChannel>(){
protected void initChannel(SocketChannel ch){
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(new ProtobufDecoder(MsgProtos.Msg.getDefaultInstance()));
ch.pipeline().addLast(new ProtobufBussinessDecoder());
}
});
}

static class ProtobufBussinessDecoder extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx , Object msg )...{
MsgProtos.Msg protoMsg = (MsgProtos.Msg)msg ;
Logger.info("收到一个protobuf") ;
}
}
}

// 客户端
public class ProtoBufSendClient{
static String content = "Something";
public void runClient(){
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try{
// ..
b.childHandler(new ChannelInitializer<SocketChannel>(){
protected void initChannel(SocketChannel ch)...{
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
});
ChannelFuture f = b.connect();
f.sync();
Channel channel = f.channel();
for (int i = 0 ; i < 1000 ; i ++ ){
MsgProtos.Msg user = build(i, i + "->" + content);
channel.writeAndFlush(user);
Logger.info("发送报文数:" + i);
}
channel.flush();
}
public MsgProtos.Msg build(int id ,String content ){
MsgProtos.Msg.Builder builder = MsgProtos.Msg.newBuilder();
builder.setId(id);
builder.setContent(content);
return builder.build();
}

public static void main(String[] args) throws InterruptedException{
int port = NettyDemoConfig.SOCKET_SERVER_PORT;
String ip = NettyDemoConfig.SOCKET_SERVER_IP;
new ProtoBufSendClient(ip,port).runClient();
}
}

Java编程 VIIII 读取与序列化
http://gadoid.io/2025/05/10/Java编程-VIIII-读取与序列化/
作者
Codfish
发布于
2025年5月10日
许可协议