Java编程 VIII Netty

Reactor模式

Reactor 线程 : 负责响应IO 事件,并分发到Handlers处理器

Handlers 处理器 : 非阻塞的执行业务处理逻辑

Connection Per Thread 模式

每当收到一个socket请求。为对应请求单独创建一个线程来进行 数据的收发处理。

在高并发场景下,会创造出大量的线程, 线程的创建,销毁,切换会消耗大量资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class ConnectionPerThread  implements Runnable{
public void run(){
try {
ServerSocket serverSocket = new ServerSocket(NioDemoConfig.SOCKET_SERVER_PORT);
while(!Thread.interrupted()){
Socket socket = serverSocket.accept();
Handler handler = new Handler(socket);
new Thread(handler).start();
}

}catch(IOException ex) {
...
}

}
static class Handler implements Runnable{
final Socket socket ;
Handler(Socket s ){
socket = s ;
}
public void run(){
while (true){
try {
byte[] input = new byte[1024];
socket.getInputStream().read(input);
byte[] output = null;
socket.getOutputStream().write(output)
}catch (IOException e) {
...
}
}
}
}
}

单线程Reactor

将Reactor 与 Handlers 放入同一个线程中

void attach(Object o)

将对象附加到选择键

Object attachment()

从选择键中取出对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class EchoServerReactor implements Runnable {
Selector selector ;
ServerSocketChannel serverSocket ;
EchoServerReactor() throws IOException{
// ... 打开选择器,serverSocket 连接监听通道
// 注册serverSocket的accept 监听事件
SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT);
sk.attach(new AcceptorHandler());
}
public void run () {
// 选择器轮询
try {
while(!Thread.interrupted()){
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while(it.hasNext()){
SelectionKey sk = it.next();
dispatch(sk);
}
selected.clear();
}
}catch(IOException ex) { ex.printStackTrace();}
}
void dispatch(SelectionKey k){
Runnable handler = (Runnable)(k.attachment());
if(handler != null){
handler.run()
}
}
class AcceptorHandler implements Runnable {
public void run(){
SocketChannel channel = serverSocket.accept();
if (channel != null) new EchoHandler(selector,channel);
}

}

}

单线程Reactor 的实现逻辑是, 定义了监听器selector 和通道,先注册一个ACCEPT事件放入Selector中,当监听到有socket连接请求后,触发其所附带的程序来创建新的channel 用于处理新的连接请求,并将该通道同样加入到selector中,这样在后续的select() 阻塞中 即可以继续监听OP_ACCEPT请求创建新的读取通道,也可以监听读取通道的就绪状态,来读取数据

Reactor : 监听ACCEPT事件的处理程序

Handlers : 触发ACCEPT事件后新创建的用于处理OP_READ的处理程序

缺点 : 当某个Handlers阻塞后,会导致的所有的Handler都被阻塞,甚至影响OP_ACCEPT的监听

多线程Reactor

将 Handlers 与Reactor 线程隔离, 减少Handlers阻塞时,对OP_ACCEPT处理的影响

创建子反应器充分利用多核心资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class MultiThreadEchoServerReactor{
ServerSocketChannel serverSocket ;
AtomicInteger next = new AtomicInteger(0);
Selector[] selectors = new Selector[2];
SubReactor[] subReactors = null ;
MultiThreadEchoServerReactor() throws IOException {
selectors[0] = Selector.open();
selectors[1] = Selector.open();
serverSocket = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress("127.0.0.1",18899);
serverSocket.socket().bind(address);
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(selectors[0],OP_ACCEPT);
sk.attach(new AcceptorHandler());
SubReactor subReactor1 = new SubReactor(selectors[0]);
SubReactor subReactor2 = new SubReactor(selectors[1]);
subReactors = new SubReactor[]{subReactor1,subReactor2};
}
private void startService(){
new Thread(subReactors[0].start());
new Thread(subReactors[1].start());
}
class SubReactor implements Runnable{
final Selector selector ;
public SubReactor(Selector selector){
this.selector = selector ;
}
public void run(){
try {
while (! Thread.interrupted()){
selector.select();
Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> it = keySet.iterator();
while(it.hasNext()){
SelectionKey sk = it.next();
dispatch(sk);
}
keySet.clear();
}
}catch(IOException e){
ex.printStackTrace();
}
}
void dispatch(SelectionKey sk){
Runnable handler = (Runnable)sk.attachment();
if(handler != null){
handler.run();
}
}
}
class AcceptorHandler implements Runnable{
public void run(){
try {
SocketChannel channel = serverSocket.accept();
if (channel != null) new MultiThreadEchoHandler(selectors[1],channel);
}catch (IOException e){
e.printStackTrace();
}
}
}

public static void main(String[] args ) throws IOException{
MultiThreadEchoServerReactor server = new MultiThreadEchoServerReactor();
server.startService();

}

}

Netty

Netty 是一个基于Java NIO库的高性能网络框架

Netty中提供了一个 集成器 ServerBootstrap 用于对整个执行过程进行配置。

通过创建EventLoopGroup 组 来创建不同的reactor 和 handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class NettyDiscardServer {
private final int serverPort ;
ServerBootstrap b = new ServerBootstrap();
public NettyDiscardServer(int port) {
this.serverPort = port ;
}
public void runServer(){
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
b.group(bossLoopGroup , workerLoopGroup);
b.channel(NioServerSocketChannel.class);
b.localAddress(serverPort);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.childHandler(new ChannelInitializer<SocketChannel>(){
protected void initChannel (SocketChannel ch){
ch.pipeline().addLast(new NettyDiscardHandler());
}
});
ChannelFuture channelFuture = b.bind().sync();
Logger.info("服务器启动");
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
} catch (Exception e){
e.printStackTrace();
} finally {
workerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();
}
}
public static void main(String[] args){
int port =NettyDemoConfig.SOCKET_SERVER_PORT;
new NettyDiscardServer(port).runServer();
}
}

Netty中根据不同的协议定义了不同协议类型channel

1
2
3
4
5
6
7
8
9
10
NioSocketChannel  异步非阻塞TCP Socket 通道
NioServerSocketChannel 异步非阻塞TCP Socket 服务端监听通道
NioDatagramChannel 异步非阻塞 UDP 传输通道
NioSctpChannel 异步非阻塞 Sctp 传输通道
NioSctpServerChannel 异步非阻塞 Sctp 服务端监听通道
OioSocketChannel 同步非阻塞TCP Socket 通道
OioServerSocketChannel 同步非阻塞TCP Socket 服务端监听通道
OioDatagramChannel 同步非阻塞 UDP 传输通道
OioSctpChannel 同步非阻塞 Sctp 传输通道
OioSctpServerChannel 同步非阻塞 Sctp 服务端监听通道

Netty中的 Reactor

Netty 反应器中装载了两个属性

SingleThreadEventExecutor (Thread) 用于 具体的 通道处理

NioEventLoop (Selector) 用于监听注册到该选择器中的 通道就绪状态

一个EventLoop 可以注册多个通道

Netty中的 Handler

Netty中的Handler 包含 ChannelInboundHandler 和 ChannelOutboundHandler

ChannelInboundHandler 通过 实现 ChannelInboundHandlerAdapter 来完成,用于处理接收到OP_READ 事件

ChannelOutboundHandler 通过 实现 ChannelOutboundHandlerAdapter 来完成,用于处理接收到OP_WRITE 事件

Netty中的pipeline

Netty 的每个通道都配有一个 ChannelPipeline,可以串联多个 Handler,形成链式处理结构。

其定义了一个ChannelPipeLine 类,其是一个双向链表,支持 :

  1. 在执行完某个handler 后,后续仍存在其他处理器,交由后续处理器继续执行
  2. 当后续没有其他处理器后,IO事件处理完成
  3. 当需要在某个处理器中中止处理,可以将流水线终止执行

入站 IO 事件只能交由Inbound 处理器处理 (由前往后)

出站 IO 事件只能交由Outbound 处理器处理 (由后往前)

引导类 Bootstrap

Netty提供了引导类 来帮助完成 client 和server 端的连接配置

父子通道

NioServerSocketChannel 用于进行 连接监听

NioSocketChannel 用于进行 数据处理

对于Netty来说 负责连接监听的叫做父通道,而在连接建立后进行数据处理的通道被称为子通道

EventLoopGroup

Netty 中的Selector 实现是多线程版本的

每一个EventLoop 相当于一个子反应器

而 通过EventLoopGroup 可以创建一个 子反应器组,创建多个线程和多个反应器(一 一对应)来进行IO事件的查询,处理,分发。

Bootstrap启动与配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ServerBootstrap b = new ServerBootstrap();  // 创建引导类
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1) ; // boss 轮询组 监听连接请求
EventLoopGroup workLoopGroup = new NioEventLoopGroup(); // worker 轮询组 处理具体业务
b.group(bossLoopGroup,workLoopGroup); // 将 组加入引导类配置
b.channel(NioServerSocketChannel.class); // 设置通道IO类型
b.localAddress(new InetSocketAddress(port)); // 设置监听端口
b.option(ChannelOption.SO_KEEPALIVE, true); // 配置通道选项, 这里设置保活为true
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); //
b.childHandler(new ChannelInitializer<SocketChannel>(){
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new NettyDiscardHandler());
}
})

ChannelFuture channelFuture = b.bind().sync(); // 绑定到异步任务
Loggger.info("服务器启动成功");

ChannelFuture closeFuture = ChannelFuture.channel().closeFuture();
closeFuture.sync(); // 自我阻塞

workLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();

ChannelOption

SO_RCVBUF / SO_SNDBUF

设置收发缓冲区大小

TCP_NODELAY

即时发送数据

SO_KEEPALIVE

保活

SO_REUSEADDR

复用地址

SO_LINGER

SO_BACKLOG

Channel

1
2
3
4
5
6
7
8
9
10
11
12
ChannelFuture connect(SocketAddress address)
// 连接远程服务器
ChannelFuture bind(SocketAddress address)
// 绑定监听地址
ChannelFuture close()
// 关闭通道
Channel read()
// 读取通道数据
Channel write(Object o)
// 写入通道数据
Channel flush()
// 立即将数据写入对端

Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ChannelInboundHandler  // 入站处理器
channelRegistered()
// 将通道注册到通道流水线中
channelActive()
// 激活通道
channelRead();
// 回调进行数据读取
channelReadComplete()
// 通知缓冲区读取完成
channelInactive()
// 连接断开或不可用
exceptionCaught()
// 异常捕获

ChannelOutboundHandler
bind() // 监听地址绑定
connect() // 连接服务端
write() // 通知底层IO写入
flush() // 清空底层缓存区,立即写出到对端
disConnect() // 断开连接
close() // 关闭底层通道

ChannelInitializer

通过抽象接口完成 将 新的handler插入的 pipeline 的过程。

1
2
3
4
5
b.childHandler(new ChannelInitializer<SocketChannel>(){
protected void initChannel(SocketChannel ch){
ch.pipeline().addLast(new NettyDiscardHandler())
}
})

Pipeline

pipeline的入栈处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class InPipeline{
static class SimpleInHandlerA extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)... {
Logger.info("入站处理A,被回调");
super.channelRead(ctx,msg);
}
}
static class SimpleInHandlerB extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)... {
Logger.info("入站处理B,被回调");
super.channelRead(ctx,msg);
}
}
static class SimpleInHandlerC extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)... {
Logger.info("入站处理C,被回调");
super.channelRead(ctx,msg);
}
}
public void testPipelineInBound() {
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>(){
protected void initChannel(EmbeddedChannel ch){
ch.pipeline().addLast(new SimpleInHandlerA() );
ch.pipeline().addLast(new SimpleInHandlerB() );
ch.pipeline().addLast(new SimpleInHandlerC() );
}
};
EmbeddedChannel channel = new EmbeddedChannel(i);
ByteBuf buf = Unpooled.buffer();
buf.writeInt(1);
channel.writeInbound(buf);
}
}
/*
输出
入站处理器A : 被回调
入站处理器B : 被回调
入站处理器C : 被回调
*/

pipeline 出站处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class InPipeline{
static class SimpleOuthandlerA extends ChannelOutboundHandlerAdapter{
@Override
public void channelWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)... {
Logger.info("出站处理A,被回调");
super.channelWrite(ctx,msg,promise);
}
}
static class SimpleOutHandlerB extends ChannelOutboundHandlerAdapter{
@Override
public void channelWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)... {
Logger.info("出站处理B,被回调");
super.channelWrite(ctx,msg,promise);
}
}
static class SimpleOutHandlerC extends ChannelOutboundHandlerAdapter{
@Override
public void channelWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)... {
Logger.info("出站处理C,被回调");
super.channelWrite(ctx,msg,promise);
}
}
public void testPipelineInBound() {
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>(){
protected void initChannel(EmbeddedChannel ch){
ch.pipeline().addLast(new SimpleInHandlerA() );
ch.pipeline().addLast(new SimpleInHandlerB() );
ch.pipeline().addLast(new SimpleInHandlerC() );
}
};
EmbeddedChannel channel = new EmbeddedChannel(i);
ByteBuf buf = Unpooled.buffer();
buf.writeInt(1);
channel.writeOutbound(buf);
}
}
/*
输出
入站处理器C : 被回调
入站处理器B : 被回调
入站处理器A : 被回调
*/

上下文 ChannelHandlerContext

Netty中的Handler 是无状态的,通过上下文来维护

Channel Channelpipeline 和 ChannelHandlerContext 的关系

Channel - > 通道 标识某一个socket连接

Channelpipeline - > 一条流水线 用于对channel中的数据进行处理

ChannelHandlerContext - > 流水线的节点,用于维护状态信息和Handler

HeadContext 与 TailContext

TailContext 既作为一个上下文存储对象,也是一个入站处理器

1
2
3
4
5
6
7
8
9
public class DefaultChannelPipeline implements ChannelPipeline {
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
@Override
public void channelRead(ChannelHandlerContext ctx , Object msg){
...
}
}
}
}

HeadContext 即需要实现 ChannelInboudHandler 又需要实现ChannellnboundHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class DefaultChannelPipeline implements ChannelPipeline{
final class HeadContext extends AbstractChannelHandlerContext implements ChannelInboundHandler
, ChannelOutBoundHandler {
private final Unsafe unsafe ;
@Override
public void channelRead(ChannelHandlerContext ctx , Object msg){
ctx.fireChannelRead(msg);

}
@Override
public void read(ChannelHandlerContext ctx){
unsafe.beginRead();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg , ChannelPromise promise){
unsafe.write(msg , promise);
}
}

Pipeline的传递

每一个上下文对象通过findContextInbound() 来获取 链表中的下一个上下文对象,之后通过调用next.invokeChannelRead(msg) 进行调用,在调用的invokeChannel方法中,再次查询下一个链表对象并执行

Pipeline的截断

如果想对Pipeline进行截断 则可以

  1. 不调用基类的channelRead 终止流水线
  2. 或者不调用ctx.fireChannelRead 中断查询后续上下文的过程,直接返回。

流水线热插拔

ChannelPipeline 提供了一些方法来对流水线中的上下文进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface ChannelPipeline extends Iterable <Entry<String,ChannelHandler>>{
ChannelPipeline addFirst(String name , ChannelHandler handler );

ChannelPipeline addLast(String name , ChannelHandler handler );

ChannelPipeline addBefore(String baseName , String name , ChannelHandler handler );

ChannelPipeline addAfter(String baseName , String name , ChannelHandler handler );

ChannelPipeline remove(ChannelHandler handler );

ChannelPipeline remove(String handler );

ChannelPipeline removeFirst();

ChannelPipeline removeFirst();
}

ByteBuf

ByteBuf 是一个字节容器,定义了一个字节数组,并在容器内分为4个区域

  1. 废弃 : 已经被使用过的字节内容
  2. 可读 : 待被读取的字节内容
  3. 可写 :可以向其中写入的区域
  4. 扩容 :剩余待使用的区域

ByteBuf 通过3个定义指针来 界定这四个区域

readerIndex 读指针 所处位置 开始可以读取字节

writerIndex 写指针 所处位置 开始可以写入字节

maxCapacity 最大容量 当前ByteBuf 支持的最大容量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//方法 
// 容量 :
capacity() // Bytebuf的容量
maxCapacity() // Bytebuf支持的最大容量

// 写入
isWritable() // 是否可写
writableBytes() //可写入的字节数
maxWritable() // 最大可写入的字节数
writeBytes(byte[] src) // 把入参src字节数组中的数据写入Bytebuf
writeType() // 写入数据类型的数据
setType() // 设置基础数据类型
markWriterIndex()
resetWriterIndex() // 存储写入位置指针 / 恢复写指针

// 读取
isReadable() // 是否可读
readableBytes() // 可读的字节数
readBytes(byte[] dst) // 将ByteBuf中的数据读到 数组dst中
readType() // 以某种数据类型读取
getType() // 获取当前的数据类型
markReaderIndex()
resetReaderIndex() // 存储当前的读指针 / 恢复读指针

池化与非池化

池化和非池化的区别在于内存的分配与回收策略不同。池化是在初始化时预先分配一块固定大小的内存区域,后续的 ByteBuf 实例均通过在这块内存中进行切片和复用来完成分配与释放。相比之下,非池化则在每次创建 ByteBuf 时都直接申请新的内存,使用完毕后再交由系统释放。

池化的优势在于显著减少了内存分配和回收的开销,降低了内存碎片化风险,提升了性能,适合高并发、高频率的场景;但其缺点是初始内存占用较高。非池化虽然按需分配,初始内存占用小,但在频繁创建和销毁时会带来较大性能负担,容易产生内存碎片,更适用于低频访问或短生命周期的缓存场景。

堆内存和直接内存(系统)

在缓冲区管理中,除了池化与非池化的分配策略,底层还涉及具体内存类型的选择,主要分为:Java 堆内存(heap buffer)系统直接内存(direct buffer / off-heap) 。二者在使用中各有优势与限制。

使用堆内存(Heap Buffer)

堆内存是 JVM 管理的内存,ByteBuf 实例实际分配在 Java 堆上,其生命周期由 GC 控制。

优点

  • 分配开销小,分配快(使用 JVM 自带的堆内存分配器)
  • 与 JVM 数据结构兼容性好,读取数据不需要额外复制
  • GC 可自动回收,适合生命周期不确定的缓存

缺点

  • 需要将数据从堆复制到内核空间 ,才能通过 SocketChannel.write() 等发送数据(即涉及一次内存复制)
  • 在高频 IO 场景中可能增加 CPU 使用率和 GC 压力
  • 长期运行中容易造成 GC pause,影响实时性

使用系统直接内存(Direct Buffer)

直接内存位于 JVM 堆外,由操作系统管理,通过 sun.misc.UnsafeByteBuffer.allocateDirect() 分配。Netty 封装后用于实现零拷贝(zero-copy)传输。

优点

  • 避免堆与内核缓冲区之间的复制,提高 IO 性能
  • 支持 DMA(Direct Memory Access)等底层优化机制
  • 避免 GC 管理,减少长时间运行系统的 GC 压力

缺点

  • 分配成本高(系统调用开销),释放慢(不是 GC 自动管理)
  • 容易内存泄漏:若不主动释放或池化,堆外内存不可回收
  • 对调试和监控不友好:GC 日志和内存分析工具无法追踪

Bytebuf 浅拷贝

ByteBuf slice()

ByteBuf slice(int index,int length)

使用上述方法对对象进行浅拷贝

拷贝后的切片 是当前已被读取Bytebuf的长度大小。

读指针 被设置为 0 , 写指针和最大容量都为切片的读位置

使用duplicate() 来进行对原对象的整体复制。

上述方法都是对原数据的浅层复制,即是对某段存储的引用。当原对象被释放后就不再能被使用了。可以通过增加引用计数的方式来保持对象的可用状态。

零拷贝

Netty 提供了多种方法 帮助程序减少内存复制

  1. CompositeByteBuf组合缓冲区类。将多个ByteBuf合并为一个逻辑上的Bytebuf
  2. Netty提供了 浅层复制操作,可以将ByteBuff分解为多个共享同一存储区域的ByteBuf
  3. 可以直接将文件缓冲区的数据发送到目标通道,避免通道和缓冲之间的多次复制
  4. 提供了一系列包装类,避免转换过程中的内存拷贝。
  5. 通过使用直接内存进行socket读写,减少了从JVM到直接内存的拷贝过程

CompositeByteBuf

通过合并多个Bytebuf队形,重新组织他们的关系,增加了查询过程,但是减少了创建新的调用对象的复制过程

WrappedBuffer()

通过将原始类型数组包装为对象,减少对象创建过程

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
public class NettyEchoServer{
public void runServer(){
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
EventLoopGroup workLoopGroup = new NioEventLoopGroup();
// 省略 1.反应器轮询组 2.通道类型 4.通道选项
b.childHandler(new ChannelInitializer<SocketChannel>(){
protected void initChannel(SocketChannel ch) ... {
ch.pipeline().addLast(NettyEchoServerHandler.INSTANCE);
}
});
// 省略启动,等待,关闭
}
// main方法调用
}

// ServerHandler 实现
@ChannelHandler.Sharable
public class NettyEchoServerHandler extends ChannelInboundHandlerAdapter{
public static final NettyEchoServerHandler INSTANCE = new NettyEchoServerHandler();

@Override
public void channelRead(ChannelHandlerContext ctx , Object msg)...{
ByteBuf in = (ByteBuf) msg;
Logger.info("msg type : " +(in.hasArray() ? "堆内存":"直接内存"));
int len = in.readableBytes();
byte[] arr = new byte[len];
in.getBytes(0,arr);
Logger.info("Server received:" + new String(arr,"UTF-8"));
Logger.info("写回前,msg.refCnt:" +((ByteBuf) msg).refCnt());
ChannelFuture f = ctx.writeAndFlush(msg);
f.addListener((ChannelFuture futureListener)->{
Logger.info("写回后,msg.refCnt:" + ((ByteBuf) msg).refCnt());
});
}
}

// 客户端

public class NettyEchoClient{
private int serverPort ;
private String serverIp ;
Bootstrap b = new Bootstrap();

public NettyEchoClient(String ip , int port){
this.serverPort = port ;
this.serverIp = ip ;
}
public void runClient(){
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try{
b.group(workerLoopGroup);
b.channel(NioSocketChannel.class);
b.remoteAddress(serverIp,serverPort);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.handler(new ChannelInitializer<SocketChannel>(){
protected void initChannel(SocketChannel ch)...{
ch.pipeline().addLast(NettyEchoClientHandler.INSTANCE);
}
});
ChannelFuture f = b.connect();
f.addListener((ChannelFuture futureListener)->
{
if(futureListener.isSuccess()){
Logger.info("连接成功");
}else {
Logger.info("连接失败");
}
});
f.sync();
Channel channel = f.channel();
Scanner scanner = new Scanner(System.in);
Print.tcfo("请输入发送内容:");
while (scanner.hasNext()){
String next = scanner.next();
byte[] bytes = (Dateutil.getNow() + ">>" + next).getBytes("UTF-8");
ByteBuf buffer = channel.alloc().buffer();
buffer.writeBytes(bytes);
channel.writeAndFlush(buffer);
Print.tcfo("请输入发送内容");
}
} catch (Exception e){
e.printStackTrace();
} finally{
workerLoopGroup.shutdownGracefully();
}
}
// main方法

}

// 客户端handler

@ChannelHandler.Sharable
public class NettyEchoClientHandler extends ChannelInboundHandlerAdapter{
public static final NettyEchoClientHandler INSTANCE = new NettyEchoClientHandler();
@Override
public void channelRead(ChannelHandlerContext ctx , Object msg)...{
ByteBuf byteBuf = (ByteBuf)msg;
int len = byteBuf.readableBytes();
byte[] arr = new byte[len];
byteBuf.getBytes(0,arr);
Logger.info("client received :" + new String(arr,"UTF-8"));

byteBuf.release();

}

}

Java编程 VIII Netty
http://gadoid.io/2025/05/08/Java编程-VIII-Netty/
作者
Codfish
发布于
2025年5月8日
许可协议