RPC框架实现

什么是框架

框架(Framework)是一种为特定类型的软件系统预定义好整体结构、执行流程和核心功能的开发工具。

它约定了程序运行的生命周期、模块划分和调用方式,开发者只需遵循其规范,填充业务逻辑代码,利用其提供的功能模块(如路由、控制器、数据库操作、模板渲染等),即可高效构建出符合需求的应用系统。

如何设计框架

一、理念层:框架应该实现什么?

  1. 框架的定位与目标业务是什么?我们需要定义哪些核心组件?
    明确这个框架希望解决的核心问题,是快速构建 API 服务、构建事件驱动系统,还是实现微服务平台?不同的目标决定了组件的数量、功能与复杂度。
    例如:
    • Flask 目标是 轻量化 HTTP 服务开发 ,核心组件只有路由、视图函数、请求对象。
    • Django 面向 企业级全栈 Web 构建 ,则需要模型、ORM、模板、路由、中间件等多个子系统。
  2. 组件之间如何连接与通信?消息如何传递?
    模块之间的通信机制决定了耦合程度与性能表现。你需要考虑:
    • 是否是函数/类级别的直接调用(适合同步逻辑)?
    • 是否通过事件机制、回调、hook?
    • 是否通过中间件、队列、网络(如RPC、Socket)传递消息?
    • 是否要引入消息总线或调度中心?
  3. 业务处理模式是什么?IO密集还是CPU密集?
    处理模式决定框架的执行模型:
    • IO密集适合使用异步(如 asyncio / Reactor 模型)或协程(如 gevent)
    • CPU密集可能需要线程池、进程池、多核并发、任务队列
    • 是否需要任务调度器、工作线程、Pipeline?
  4. 如何将业务逻辑嵌入框架?
    业务逻辑是否:
    • 以对象方式加载(如类继承 / 注册)
    • 以函数或 handler 方式注入(如 @route('/hello') 装饰器)
    • 以配置文件、动态加载脚本、插件的形式组合?
      这决定了用户使用框架的便捷性与灵活性。
  5. 如何让框架对使用者更加友好?
    框架应简洁清晰,做到“开箱即用”,建议:
    • 暴露清晰的接口定义与使用流程(如生命周期钩子、事件注册)
    • 屏蔽底层复杂逻辑,让使用者只专注于业务构建
    • 提供自动注册、默认配置、开发文档与示例模板等

二、实现层:框架如何做好这件事?

  1. 生命周期管理:如何启动、初始化、销毁及异常恢复?
    • 是否有清晰的启动流程(如初始化组件、加载配置、注册服务等)?
    • 异常时如何保证资源释放、服务恢复?
    • 是否支持热更新、平滑重启?
  2. 可扩展性与插件机制:哪些点可以被用户自定义或替换?
    • 是否允许中间件注册、事件钩子?
    • 是否支持插件注册系统?
    • 是否可以让用户替换核心模块(如认证器、调度器)?
  3. 上下文管理机制:如何在框架中保存请求状态、线程变量?
    • 是否支持每个请求/任务的上下文隔离(如 Flask 的 g, request)?
    • 是否支持异步场景下的上下文切换?
    • 是否支持临时变量、元信息(trace id、请求时间等)管理?
  4. 错误处理与调试能力:如何统一日志、错误、告警?
    • 是否提供统一的异常捕获与响应机制?
    • 是否支持开发/生产环境分离?
    • 日志是否结构化、支持 traceId、debug 等调试信息?
  5. 测试与开发支持:是否易于测试与持续集成?
  • 是否可插入 mock 流程?
  • 是否有统一的测试入口、调试 API?
  • 是否易于自动化测试、接口验证、异常场景模拟?

RPC框架实现

RPC协议本身仍然是一段编解码的交互过程,相比HTTP或者WebSocket的文本/数据交互。RPC更像是在通过网络通信传递一个函数的“上下文”到远端,远端使用消费端提供的信息,在本地查询函数执行并返回。

请求接口

1
2
3
4
5
6
7
8
9
10
public class Request implements Serializable{
private String requestId ;
private String className ;
private String methodName ;
private String[] types ;
private Object[] args ;
private String clientApplicationName ;
private String clientIp ;
private ServiceConfig service ;
}

响应接口

1
2
3
4
5
6
7
8
9
public class Response implements Serializable{
private String requestId ;
private String isSuccess ;
private Object result ;
private Throwable error ;
/ **
* 省略getter 和 setter 方法
** /
}

定义 序列化 编解码方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// encode 
public class Encoder extends MessageToByteEncoder {
private Class <?> genericClass ;

public Encoder(Class<?>genericClass){
this.genericClass = genericClass;
}

@Override
public void encode(ChannelHandlerContext ctx , Object msg , Bytebuf buf) throws Exception{
if (genericClass.isInstance(msg)){
byte[] data = ProtostuffSerialization.serialize(msg);
// 序列化消息
out.writeInt(data.length);
// 向通道内写入 数据长度
out.writeBytes(data);
// 向通道内写入 数据
}
}

}

// decode
public class Decoder extends ByteToMessageDecoder {
private Class<?> genericClass ;

public Decoder(Class<?> genericClass){
this.genericClass = genericClass ;
}

@Override
public void decode(ChannelHandlerContext ctx ,Bytebuf in ,List<Object> out) throws Exception{
if(in.readableBytes() < 4){
return ;
}
// 判断收到的长度大小,解析数据长度内容
in.markReaderIndex();
// 如果大于4 够获取到数据长,标记当前位置
int dataLength = in.readInt();
// 按整数读 获取数据长
if(dataLength < 0){
ctx.close();
}
// 如果长度小于0 结束
if(in.readableBytes()< dataLength){
in.resetReaderIndex();
return;
}
// 如果读到的字节数小于数据长度,重置读取位置。
byte[] data = new byte[dataLength];
in.readBytes(data);
// 最终等待所有数据被读入后,创建字节数组接收读入的数据

Object obj = ProtostuffSerialization.deserialize(data,genericClass);
// 将字节数组反序列化为protos对象
out.add(obj);
// 将对象传入流水线执行下一过程
}
}

RFC通信 创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
public class Server extends Thread {
// 继承多线程方法
private Logger logger = Logger.getLogger(Server.class);
private Integer port ;
public Server(int port){
this.port =port ;
}

@Override
public void run(){
// 重新run方法
logger.info("server start ...");
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 创建连接线程组
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 创建执行线程组
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 创建通信脚手架
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>()
{
ch.pipeline().addLast(new Decoder(Request.class));
ch.pipeline().addLast(new Encoder(Response.class));
ch.pipeline().addLast(new ServerHandler());
})
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true);
/** 将线程组添加到通信组中,设置通道中使用的数据包装类,创建执行器,将解码器,
* 编码器,RPC执行器 加入到pipeline中,设置TCP连接选项
**/
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
// 创建一个异步接收对象 绑定端口。
logger.info("server started successfully, listened[" + port + "]port");
channelFuture.channel().closeFuture().sync();
// 调用结束后关闭通道
}catch (Exception e){
logger.error("server failed to start , listened["+port+"]port");
}finally{
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

// 处理器实现
public class ServerHandler extends ChannelInboundHandlerAdapter{
private Logger logger = Logger.getLogger(ServerHandler.class);

@Override
public void channelRead(ChannelHandlerContext ctx , Object msg){
// 重写channelRead方法。
Response response = new Response();
try {
Request request = (Request)msg ;
logger.info("the server receives the message " + request.getRequestId());
Map<String,ServiceConfig>serviceMap = SpringUtil.getApplicationContext().
.getBeansOfType(ServiceConfig.class);
//获取本地注册的服务信息
ServiceConfig service = null ;
for(String key : serviceMap.keySet()){
if(serviceMap.get(key).getName().equals(request.getClassName())){
service = serviceMap.get(key);
break;
}
}
// 查询请求中的目标类。
if(service == null){
throw new RuntimeException("no service found :" + request.getClassName());
}
Object serviceImpl = SpringUtil.getApplication().getBean(service.getRef());
// 获取实现类
if(serviceImpl == null){
throw new RuntimeException("no avaliable service found: " + request.getClassName());
}
Map<String, Object>map = TypeParseUtil.parseTypeScript2Class(request.getTypes(),request.getArgs());
Class<?>[] classType = (Class<?>[]) map.get("classTypes");
// 获取类名
Object[] args = (Object[]) map.get("args");
// 获取参数
Object result = serviceImpl.getClass().getMethod(request.getMethodName(),classTypes).invoke(serviceImpl,args);
// 执行,返回结果。
response.setResult(result);
response.setSuccess(true);
}catch (Throwable e){
logger.error("the server failed to process the request.", e);
response.setSuccess(false);
response.setError(e);
}

ctx.write(response);
ctx.flush();

}

}

// 客户端
public class Client{
private Logger logger = Logger.getLogger(Client.class);
private ReferenceConfig referenceConfig ;
private ChannelFuture channelFuture ;
private ClientHandler clientHandler ;
public Client(ReferenceConfig referenceConfig){
this.referenceConfig = referenceConfig;
}

public ServiceConfig connectServer(){
// 执行 服务建立逻辑
logger.info("connecting to the server: "+ referenceConfig.getDirectServerIp() + ":" + referenceConfig.getDirectServerPort());
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 创建业务线程组
Bootstrap Bootstrap = new Bootstrap();
// 脚手架
Bootstrap.group(workerGroup).channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE,true);
.handler(new ChannelInitializer<SocketChannel>(){
@Override
public void initChannel(SocketChannel ch) throws Exception{
ch.pipeline().addLast(new Encoder(Request.class));
ch.pipeline().addLast(new Decoder(Response.class));

clientHandler = new ClientHandler();
ch.pipeline().addLast(new RpcReadTimeoutHandler(clientHandler,referenceConfig.getTimeout(),TimeUnit.MILLISECONDS));
ch.pipeline().addLast(clientHandler);
}
});
//设置TCP选项,向线程组中pipeline中添加处理器
try{
if(!StringUtils.isEmpty(referenceConfig.getDirectServerIp())){
channelFuture = Bootstrap.connect(referenceConfig.getDirectServerIp(),
referenceConfig.getDirectServerPort()).sync();
logger.info("successfully connected");
}
}catch (Exception e){
throw new RuntimeException(e);
}
return null ;
}
public Response remoteCall(Request request) throws Throwable{
channelFuture.channel().writeAndFlush(request).sync();
// 向通道写入request
Response response = clientHandler.getResponse();
// 接受响应信息
logger.info("receive a response from the server :" + response.getRequestId());
channelFuture.channel().closeFuture().sync();
if(response.getSuccess()){
return response ;
}
throw response.getError();
}

}

本地存根实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class InvokerInvocationHandler implements InvocationHandler{
/* 本地存根是通过创建一个代理,接受目标调用函数的接口,组装一个request对象,发送给服务端
服务端接收到request对象后从中获取接口信息,类名信息,和方法在服务端执行
*/
private Logger logger = Logger.getLogger(InvokerInvocationHandler.class);
private ReferenceConfig referenceConfig ;
public InvokerInvocationHandler(ReferenceConfig referenceConfig){
this.referenceConfig = referenceConfig ;
}
@Override
public Object invoke(Object proxy , Method method , Object[] args) throws Throwable{
// 重写invoke ,使得本地的invoke 指向远程调用
return invoke(method.getName(),method.getParameterTypes(), args);
}
public Object invoke(String methodName , Class[] argTypes , Object[] args) throws Throwable {
// 将对动态代理的调用改为调用remoteCall
return remoteCall(referenceConfig , methodName , argTypes , args );
}

public Object remoteCall(ReferenceConfig reference , String methodName, Class[] argTypes , Object[] args) throws Throwable{
// 创建remoteCall 方法,组装request对象
Request request = new Request();
request.setRequestId(RpcContext.getUUid().get());
request.setClientApplicationName(RpcContext.getApplicationName());
request.setClientIp(RpcContext.getLocalIp());
request.setClassName(referenceConfig.getName());
request.setMethodName(methodName);
request.setTypes(getTypes(argTypes));
request.setArgs(args);
Response response ;
try {
Client client = new Client(reference);
ServiceConfig service = client.connectionServer();
// 连接对端。
request.setService(service);
response = client.remoteCall(request);
// 执行调用
return response.getResult();
} catch (Throwable e){
logger.error(e);
throw e;
}
}

private String[] getTypes(Class<?>[] methodTypes){
String[] types = new String[methodTypes.length];
for(int i = 0 ; i < methodTypes.length ; i++){
types[i] = methodTypes[i].getName();
}
return types ;
}
}

RPC框架实现
http://gadoid.io/2025/05/17/RPC框架实现/
作者
Codfish
发布于
2025年5月17日
许可协议