
| public class Server extends Thread { private Logger logger = Logger.getLogger(Server.class); private Integer port ; public Server(int port){ this.port =port ; }
@Override public void run(){ logger.info("server start ..."); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { ch.pipeline().addLast(new Decoder(Request.class)); ch.pipeline().addLast(new Encoder(Response.class)); ch.pipeline().addLast(new ServerHandler()); }) .option(ChannelOption.SO_BACKLOG,128) .childOption(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); logger.info("server started successfully, listened[" + port + "]port"); channelFuture.channel().closeFuture().sync(); }catch (Exception e){ logger.error("server failed to start , listened["+port+"]port"); }finally{ workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
public class ServerHandler extends ChannelInboundHandlerAdapter{ private Logger logger = Logger.getLogger(ServerHandler.class);
@Override public void channelRead(ChannelHandlerContext ctx , Object msg){ Response response = new Response(); try { Request request = (Request)msg ; logger.info("the server receives the message " + request.getRequestId()); Map<String,ServiceConfig>serviceMap = SpringUtil.getApplicationContext(). .getBeansOfType(ServiceConfig.class); ServiceConfig service = null ; for(String key : serviceMap.keySet()){ if(serviceMap.get(key).getName().equals(request.getClassName())){ service = serviceMap.get(key); break; } } if(service == null){ throw new RuntimeException("no service found :" + request.getClassName()); } Object serviceImpl = SpringUtil.getApplication().getBean(service.getRef()); if(serviceImpl == null){ throw new RuntimeException("no avaliable service found: " + request.getClassName()); } Map<String, Object>map = TypeParseUtil.parseTypeScript2Class(request.getTypes(),request.getArgs()); Class<?>[] classType = (Class<?>[]) map.get("classTypes"); Object[] args = (Object[]) map.get("args"); Object result = serviceImpl.getClass().getMethod(request.getMethodName(),classTypes).invoke(serviceImpl,args); response.setResult(result); response.setSuccess(true); }catch (Throwable e){ logger.error("the server failed to process the request.", e); response.setSuccess(false); response.setError(e); }
ctx.write(response); ctx.flush();
}
}
public class Client{ private Logger logger = Logger.getLogger(Client.class); private ReferenceConfig referenceConfig ; private ChannelFuture channelFuture ; private ClientHandler clientHandler ; public Client(ReferenceConfig referenceConfig){ this.referenceConfig = referenceConfig; }
public ServiceConfig connectServer(){ logger.info("connecting to the server: "+ referenceConfig.getDirectServerIp() + ":" + referenceConfig.getDirectServerPort()); EventLoopGroup workerGroup = new NioEventLoopGroup(); Bootstrap Bootstrap = new Bootstrap(); Bootstrap.group(workerGroup).channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE,true); .handler(new ChannelInitializer<SocketChannel>(){ @Override public void initChannel(SocketChannel ch) throws Exception{ ch.pipeline().addLast(new Encoder(Request.class)); ch.pipeline().addLast(new Decoder(Response.class)); clientHandler = new ClientHandler(); ch.pipeline().addLast(new RpcReadTimeoutHandler(clientHandler,referenceConfig.getTimeout(),TimeUnit.MILLISECONDS)); ch.pipeline().addLast(clientHandler); } }); try{ if(!StringUtils.isEmpty(referenceConfig.getDirectServerIp())){ channelFuture = Bootstrap.connect(referenceConfig.getDirectServerIp(), referenceConfig.getDirectServerPort()).sync(); logger.info("successfully connected"); } }catch (Exception e){ throw new RuntimeException(e); } return null ; } public Response remoteCall(Request request) throws Throwable{ channelFuture.channel().writeAndFlush(request).sync(); Response response = clientHandler.getResponse(); logger.info("receive a response from the server :" + response.getRequestId()); channelFuture.channel().closeFuture().sync(); if(response.getSuccess()){ return response ; } throw response.getError(); }
}
|