1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
| public class Server extends Thread { private Logger logger = Logger.getLogger(Server.class); private Integer port ; public Server(int port){ this.port =port ; }
@Override public void run(){ logger.info("server start ..."); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { ch.pipeline().addLast(new Decoder(Request.class)); ch.pipeline().addLast(new Encoder(Response.class)); ch.pipeline().addLast(new ServerHandler()); }) .option(ChannelOption.SO_BACKLOG,128) .childOption(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); logger.info("server started successfully, listened[" + port + "]port"); channelFuture.channel().closeFuture().sync(); }catch (Exception e){ logger.error("server failed to start , listened["+port+"]port"); }finally{ workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
public class ServerHandler extends ChannelInboundHandlerAdapter{ private Logger logger = Logger.getLogger(ServerHandler.class);
@Override public void channelRead(ChannelHandlerContext ctx , Object msg){ Response response = new Response(); try { Request request = (Request)msg ; logger.info("the server receives the message " + request.getRequestId()); Map<String,ServiceConfig>serviceMap = SpringUtil.getApplicationContext(). .getBeansOfType(ServiceConfig.class); ServiceConfig service = null ; for(String key : serviceMap.keySet()){ if(serviceMap.get(key).getName().equals(request.getClassName())){ service = serviceMap.get(key); break; } } if(service == null){ throw new RuntimeException("no service found :" + request.getClassName()); } Object serviceImpl = SpringUtil.getApplication().getBean(service.getRef()); if(serviceImpl == null){ throw new RuntimeException("no avaliable service found: " + request.getClassName()); } Map<String, Object>map = TypeParseUtil.parseTypeScript2Class(request.getTypes(),request.getArgs()); Class<?>[] classType = (Class<?>[]) map.get("classTypes"); Object[] args = (Object[]) map.get("args"); Object result = serviceImpl.getClass().getMethod(request.getMethodName(),classTypes).invoke(serviceImpl,args); response.setResult(result); response.setSuccess(true); }catch (Throwable e){ logger.error("the server failed to process the request.", e); response.setSuccess(false); response.setError(e); }
ctx.write(response); ctx.flush();
}
}
public class Client{ private Logger logger = Logger.getLogger(Client.class); private ReferenceConfig referenceConfig ; private ChannelFuture channelFuture ; private ClientHandler clientHandler ; public Client(ReferenceConfig referenceConfig){ this.referenceConfig = referenceConfig; }
public ServiceConfig connectServer(){ logger.info("connecting to the server: "+ referenceConfig.getDirectServerIp() + ":" + referenceConfig.getDirectServerPort()); EventLoopGroup workerGroup = new NioEventLoopGroup(); Bootstrap Bootstrap = new Bootstrap(); Bootstrap.group(workerGroup).channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE,true); .handler(new ChannelInitializer<SocketChannel>(){ @Override public void initChannel(SocketChannel ch) throws Exception{ ch.pipeline().addLast(new Encoder(Request.class)); ch.pipeline().addLast(new Decoder(Response.class)); clientHandler = new ClientHandler(); ch.pipeline().addLast(new RpcReadTimeoutHandler(clientHandler,referenceConfig.getTimeout(),TimeUnit.MILLISECONDS)); ch.pipeline().addLast(clientHandler); } }); try{ if(!StringUtils.isEmpty(referenceConfig.getDirectServerIp())){ channelFuture = Bootstrap.connect(referenceConfig.getDirectServerIp(), referenceConfig.getDirectServerPort()).sync(); logger.info("successfully connected"); } }catch (Exception e){ throw new RuntimeException(e); } return null ; } public Response remoteCall(Request request) throws Throwable{ channelFuture.channel().writeAndFlush(request).sync(); Response response = clientHandler.getResponse(); logger.info("receive a response from the server :" + response.getRequestId()); channelFuture.channel().closeFuture().sync(); if(response.getSuccess()){ return response ; } throw response.getError(); }
}
|