@@ -45,8 +45,8 @@ Client invoke => message object => encode to bytes => tranport through net => se
4545# IO传输
4646
4747## TCP粘包、拆包
48- tcp传输过程中的任何一个节点都可能会将数据包拆分或合并,最终保证的是数据到达终点的顺序是一致的。由于TCP只关心字节数组流,并不知晓上层的数据格式。所以需要应用层来处理数据是否完整的问题,一般在数据协议上会采用一个字段来表示数据的长度,知道了消息的长度,就可以解决粘包的问题。对于拆包问题,当读到的数据长度比数据长度小时,要继续等待数据。Netty中提供了` LengthFieldBasedFrameDecoder ` 这个类帮助我们简化粘包、拆包问题,如我们定义协议为表示数据长度的4字节 + 数据,数据长度不包括长度字段本身,假设传输数据"0101"会被转化为 0x 00 00 00 04 01 01。
49- 解码时 ,` LengthFieldBasedFrameDecoder ` 中的decode方法中的
48+ tcp传输过程中的任何一个节点都可能会将数据包拆分或合并,最终保证的是数据到达终点的顺序是一致的。由于TCP只关心字节数组流,并不知晓上层的数据格式。所以需要应用层来处理数据是否完整的问题,一般在数据协议上会采用一个字段来表示数据的长度,知道了消息的长度,就可以解决粘包的问题。对于拆包问题,当读到的数据长度比数据长度小时,要继续等待数据。Netty中提供了` LengthFieldBasedFrameDecoder ` 这个类帮助我们简化粘包、拆包问题,如我们定义协议为表示数据长度的4字节 + 数据,数据长度不包括长度字段本身,假设传输数据"0101"被转化为 0x 00 00 00 04 01 01。可以使用LengthFieldBasedFrameDecoder(0, 4, 0, 4)来进行解码 。
49+ 解码遇到拆包时 ,` LengthFieldBasedFrameDecoder ` 中的decode方法中的
5050```
5151int frameLengthInt = (int) frameLength;
5252 if (in.readableBytes() < frameLengthInt) {
@@ -56,15 +56,279 @@ int frameLengthInt = (int) frameLength;
5656会返回null,数据会在` ByteToMessageDecoder ` 中累积,直到数据累积充足,解码后返回对象,由后续的Handler处理。
5757
5858# 服务的注册发布和监听
59- 类似于域名访问的问题,我们无需记住一个http服务后的服务器是哪些,它们的变更对我们都是透明的。对应RPC服务,经常需要使用集群来保证服务的稳定性和共同提高系统的性能。为此需要提供一个注册中心,当服务器启动时进行服务注册,服务器宕机时注册中心能够检测到并将其从服务注册中心删除。客户端要访问一个服务时先到注册中心获取服务列表,然后建立连接进行访问,并且当服务列表发生变化时会收到通知 。
59+ 类似于域名访问的问题,我们无需记住一个http服务后的服务器是哪些,它们的变更对我们都是透明的。对应RPC服务,经常需要使用集群来保证服务的稳定性和共同提高系统的性能。为此需要提供一个注册中心,当服务器启动时进行服务注册,服务器宕机时注册中心能够检测到并将其从服务注册中心删除。客户端要访问一个服务时先到注册中心获取服务列表,缓存到本地, 然后建立连接进行访问,并且当服务列表发生变化时会收到通知并修改本地缓存 。
6060![ 注册发布] ( http://oek9m2h2f.bkt.clouddn.com/RpcPubSub.png )
6161
62+ # 服务路由
63+ 有注册中心后,调用方调用服务提供者时可以动态的获取调用方的地址等信息进行选择,类似域名的机制。这样增加了一层抽象,避免了写死ip等问题。又一次说明了` Any problem in computer science can be solved by adding another level of indirection ` 。当有多个服务提供者时,调用方需要在其中选择一个进行调用,常见的调用策略有随机、Round-Robin(轮询)、权重比例等。采取权重方式可以通过服务调用的耗时、异常数量进行动态调用权重,也可以进行人工调整。当我们需要更复杂的控制策略是,可以通过脚本编写策略,并可以动态修改。
64+
65+ # IO调用方式
66+ ## nio和bio
67+ bio指的是传统的阻塞io,在Java中使用方式是Socket、ServerSocket、InputStream、OutputStream的组合。当读数据是,如果没有数据可读,该线程会被切换为阻塞状态,直到数据可读等待处理器调度运行,会进行两次上下文切换和两次用户态内核态切换,并且这样一个线程同时只能处理一个连接,线程是比较宝贵的资源,除了创建和销毁线程的开销外,JVM中每个线程都会有1MB左右的栈大小,这样一线程一连接的方式无法应对单机数万的情况,这时的内存消耗和上下文切换的成本都非常高。Nio指non blocking io,即非阻塞io,当数据不可读时会返回一个错误而不是阻塞,在Java中,常用Selector、SocketChannel、ServerSocketChannel、ByteBuffer的组合实现nio服务,在一个Selector上可以监听多个连接的是否可读、可写、可连接等状态,这样一个线程就可以同时处理很多个连接,能够提高系统连接能力。
68+ 下面的经典的几张图可以说明bio和nio的区别
69+ ![ bio] ( http://oek9m2h2f.bkt.clouddn.com/io:bio )
70+ ![ nio] ( http://oek9m2h2f.bkt.clouddn.com/nio.jpg )
71+ ## 同步和异步
72+ 同步指发出一个请求后是否阻塞并一直等待结果返回,而异步可以在发送请求后先去执行其他任务,在一段时间后再获取结果或通过注册监听器设置回调。在Java中一般是通过Future或者一些Listener来实现异步调用。如ExecutorService.submit()方法返回一个Future,调用Future的get时会阻塞,可以在get时设置超时时间。Guava和Netty中的Future实现可以设置Listener在结果成功或失败时进行回调。
73+
74+ # 实现
75+ 下面利用一些好用的框架帮助我们快速的实现一个RPC。 源代码在 * [ simple-rpc] ( https://github.com/liuzhengyang/simple-rpc ) *
76+ * netty 负责数据传输部分,netty作为异步事件驱动的高性能IO框架,使用方便久经考验,比手工编写nio代码方便不易出错。
77+ * protostuff 负责序列化和反序列化。google的 protobuf需要编写IDL文件然后生成,好处是能够生成各个语言的代码。但是开发起来很繁琐,使用protostuff免去编写IDL文件并生成的痛苦。
78+
79+
80+ ## 请求和返回的抽象
81+ ```
82+ @Data
83+ public class Request {
84+ private long requestId;
85+ private Class<?> clazz;
86+ private String method;
87+ private Class<?>[] parameterTypes;
88+ private Object[] params;
89+ private long requestTime;
90+ }
91+ @Data
92+ public class Response {
93+ private long requestId;
94+ private Object response;
95+ }
6296
63- # 消息协议
64- 当前采用简单的在消息体前加上4byte的消息长度值
6597
66- ## 使用示例
67- // 需要先启动一个zookeeper作为服务注册发现中心
98+ ```
99+
100+ ## 编解码部分
101+ ```
102+ public class RequestCodec extends ByteToMessageCodec<Request>{
103+ protected void encode(ChannelHandlerContext ctx, Request msg, ByteBuf out) throws Exception {
104+ byte[] bytes = Serializer.serialize(msg);
105+ int length = bytes.length;
106+ out.writeInt(length);
107+ ByteBuf byteBuf = out.writeBytes(bytes);
108+ }
109+
110+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
111+ int length = in.readInt();
112+ byte[] buffer = new byte[length];
113+ in.readBytes(buffer);
114+ Request request = Serializer.deserialize(Request.class, buffer);
115+ out.add(request);
116+ }
117+ }
118+
119+ public class ResponseCodec extends ByteToMessageCodec<Response>{
120+ private static final Logger LOGGER = LoggerFactory.getLogger(ResponseCodec.class);
121+
122+ protected void encode(ChannelHandlerContext ctx, Response msg, ByteBuf out) throws Exception {
123+ LOGGER.info("Encode {}", msg);
124+ byte[] bytes = Serializer.serialize(msg);
125+ int length = bytes.length;
126+ out.writeInt(length);
127+ ByteBuf byteBuf = out.writeBytes(bytes);
128+ }
129+
130+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
131+ int length = in.readInt();
132+ byte[] buffer = new byte[length];
133+ in.readBytes(buffer);
134+ Response response = Serializer.deserialize(Response.class, buffer);
135+ out.add(response);
136+ LOGGER.info("Decode Result: {}", response);
137+ }
138+ }
139+
140+
141+ ```
142+
143+
144+ ## Client
145+ ```
146+ public class RpcClientHandler extends SimpleChannelInboundHandler<Response>{
147+ private static final Logger LOGGER = LoggerFactory.getLogger(RpcClientHandler.class);
148+
149+ protected void channelRead0(ChannelHandlerContext ctx, Response msg) throws Exception {
150+ LOGGER.info("Receive {}", msg);
151+ BlockingQueue<Response> blockingQueue = RpcClient.responseMap.get(msg.getRequestId());
152+ blockingQueue.put(msg);
153+ }
154+ }
155+
156+ public class RpcClient {
157+ private static AtomicLong atomicLong = new AtomicLong();
158+ private String serverIp;
159+ private int port;
160+ private boolean started;
161+ private Channel channel;
162+ EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
163+ public static ConcurrentMap<Long, BlockingQueue<Response>> responseMap = new ConcurrentHashMap<Long, BlockingQueue<Response>>();
164+
165+ public RpcClient(String serverIp, int port) {
166+ this.serverIp = serverIp;
167+ this.port = port;
168+ }
169+
170+ public void init() {
171+ Bootstrap bootstrap = new Bootstrap();
172+ bootstrap.channel(NioSocketChannel.class)
173+ .group(eventLoopGroup)
174+ .handler(new ChannelInitializer<Channel>() {
175+ protected void initChannel(Channel ch) throws Exception {
176+ ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO))
177+ .addLast(new ResponseCodec())
178+ .addLast(new RpcClientHandler())
179+ .addLast(new RequestCodec())
180+ ;
181+ }
182+ });
183+ try {
184+ ChannelFuture f = bootstrap.connect(serverIp, port).sync();
185+ this.channel = f.channel();
186+ } catch (InterruptedException e) {
187+ e.printStackTrace();
188+ }
189+ }
190+
191+ public Response sendMessage(Class<?> clazz, Method method, Object[] args) {
192+ Request request = new Request();
193+ request.setRequestId(atomicLong.incrementAndGet());
194+ request.setMethod(method.getName());
195+ request.setParams(args);
196+ request.setClazz(clazz);
197+ request.setParameterTypes(method.getParameterTypes());
198+ this.channel.writeAndFlush(request);
199+ BlockingQueue<Response> blockingQueue = new ArrayBlockingQueue<Response>(1);
200+ responseMap.put(request.getRequestId(), blockingQueue);
201+ try {
202+ return blockingQueue.take();
203+ } catch (InterruptedException e) {
204+ e.printStackTrace();
205+ return null;
206+ }
207+ }
208+
209+ public <T> T newProxy(final Class<T> serviceInterface) {
210+ Object o = Proxy.newProxyInstance(RpcClient.class.getClassLoader(), new Class[]{serviceInterface}, new InvocationHandler() {
211+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
212+ return sendMessage(serviceInterface, method, args).getResponse();
213+ }
214+ });
215+ return (T) o;
216+ }
217+
218+ public void destroy() {
219+ try {
220+ this.channel.close().sync();
221+ } catch (InterruptedException e) {
222+ e.printStackTrace();
223+ } finally {
224+ eventLoopGroup.shutdownGracefully();
225+ }
226+ }
227+
228+ }
229+
230+ ```
231+
232+ ## Server
233+ ```
234+ public class RpcServer {
235+ private static final Logger LOGGER = LoggerFactory.getLogger(RpcServer.class);
236+
237+ private String ip;
238+ private int port;
239+ private boolean started = false;
240+ private Channel channel;
241+ private Object serviceImpl;
242+
243+ private EventLoopGroup bossGroup = new NioEventLoopGroup();
244+ private EventLoopGroup workerGroup = new NioEventLoopGroup();
245+
246+ public RpcServer(int port, Object serviceImpl) {
247+ this.port = port;
248+ this.serviceImpl = serviceImpl;
249+ }
250+
251+ public void init() {
252+ ServerBootstrap bootstrap = new ServerBootstrap();
253+ bootstrap.group(bossGroup, workerGroup)
254+ .channel(NioServerSocketChannel.class)
255+ .childHandler(new ChannelInitializer<SocketChannel>() {
256+ protected void initChannel(SocketChannel ch) throws Exception {
257+ ch.pipeline()
258+ .addLast(new LoggingHandler(LogLevel.INFO))
259+ .addLast(new RequestCodec())
260+ .addLast(new RpcServerHandler(serviceImpl))
261+ .addLast(new ResponseCodec())
262+ ;
263+ }
264+ });
265+ try {
266+ ChannelFuture sync = bootstrap.bind(port).sync();
267+ LOGGER.info("Server Started At {}", port);
268+ started = true;
269+ this.channel = sync.channel();
270+ sync.channel().closeFuture().sync();
271+ } catch (InterruptedException e) {
272+ e.printStackTrace();
273+ } finally {
274+ bossGroup.shutdownGracefully();
275+ workerGroup.shutdownGracefully();
276+ }
277+ }
278+ }
279+
280+ public class RpcServerHandler extends SimpleChannelInboundHandler<Request> {
281+ private Object service;
282+
283+ public RpcServerHandler(Object serviceImpl) {
284+ this.service = serviceImpl;
285+ }
286+
287+ protected void channelRead0(ChannelHandlerContext ctx, Request msg) throws Exception {
288+ String methodName = msg.getMethod();
289+ Object[] params = msg.getParams();
290+ Class<?>[] parameterTypes = msg.getParameterTypes();
291+ long requestId = msg.getRequestId();
292+ Method method = service.getClass().getDeclaredMethod(methodName, parameterTypes);
293+ method.setAccessible(true);
294+ Object invoke = method.invoke(service, params);
295+ Response response = new Response();
296+ response.setRequestId(requestId);
297+ response.setResponse(invoke);
298+ ctx.pipeline().writeAndFlush(response);
299+ }
300+ }
301+
302+ ```
303+
304+ ## 序列化部分
305+ ```
306+ public class Serializer {
307+ public static byte[] serialize(Object obj){
308+ RuntimeSchema schema = RuntimeSchema.createFrom(obj.getClass());
309+ LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
310+ return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
311+ }
312+
313+
314+ public static <T> T deserialize(Class<T> clazz, byte[] bytes) {
315+ try {
316+ T t = clazz.newInstance();
317+ RuntimeSchema schema = RuntimeSchema.createFrom(clazz);
318+ ProtostuffIOUtil.mergeFrom(bytes, t, schema);
319+
320+ return t;
321+ } catch (InstantiationException e) {
322+ e.printStackTrace();
323+ } catch (IllegalAccessException e) {
324+ e.printStackTrace();
325+ }
326+ return null;
327+ }
328+ }
329+
330+ # SimpleRpc使用示例
331+ *需要先启动一个zookeeper作为服务注册发现中心*
68332```
69333// 服务接口
70334public interface IHello {
@@ -93,14 +357,14 @@ public class HelloImpl implements IHello {
93357// 客户端代码
94358// beanJavaConfig方式
95359@Bean
96- public CountService countService() {
97- RpcClientWithLB rpcClientWithLB = new RpcClientWithLB("fyes-counter");
98- rpcClientWithLB.setZkConn("127.0.0.1:2181");
99- rpcClientWithLB.init();
100- CountService countService = rpcClientWithLB.newProxy(CountService.class);
101- return countService;
102- }
103-
360+ public CountService countService() {
361+ RpcClientWithLB rpcClientWithLB = new RpcClientWithLB("fyes-counter");
362+ rpcClientWithLB.setZkConn("127.0.0.1:2181");
363+ rpcClientWithLB.init();
364+ CountService countService = rpcClientWithLB.newProxy(CountService.class);
365+ return countService;
366+ }
367+
104368// 服务端发布
105369// xml配置方式
106370<bean class =" com.github.liuzhengyang.simplerpc.ServerFactoryBean " init-method =" start " >
0 commit comments