本文将主要介绍Server端处理一次请求的流程,同时讲解一个比较巧妙的设计——Filter。
根据前面的分析我们可以推断出Server端处理网络通信的组件为NettyServer,对应处理具体事件的handler为NettyHandler,它的构造函数需要一个ChannelHandler的参数,这里传递的就是NettyServer实例的引用。这样一来,handler对messageReceived()的事件处理,又传递给了NettyServer的receive()方法
1 2 3 4 5 6 7 8 9 |
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { handler.received(channel, e.getMessage()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } } |
NettyServer本身没有实现receive方法,这个调用由基类AbstractPeer处理,而它也是再调用自己维护的ChannelHandler,也就是构造NettyServer时传入的handler。这是一个ChannelHandlerDispatcher实例,它允许同时触发一组普通的handler。实际构建时的handler为new DecodeHandler(new HeaderExchangeHandler(handler)),HeaderExchangeHandler中有一部分处理的逻辑,同时还会调用外部传递handler。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
public void received(Channel channel, Object message) throws RemotingException { ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { Response response = handleRequest(exchangeChannel, request); channel.send(response); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { ... } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); // find handler by message class. Object msg = req.getData(); try { // handle data. Object result = handler.reply(channel, msg); res.setStatus(Response.OK); res.setResult(result); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); } return res; } |
最后调用了handelr.reply()方法, 它的实现与具体的协议有关,比如默认配置下就是在DubboProtocol中构建的requestHandler,在createServer()方法中传递给Exchanger。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
private ExchangeServer createServer(URL url) { ... ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } ... return server; } private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } } }; Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException{ String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); return exporter.getInvoker(); } |
看到了invoker.invoke(),也就是真正执行调用的地方。这个Invoker实例来自于根据serviceKey查找的Exporter,它是通过ExtensionLoader创建的,是一个ProtocolFilterWrapper实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); } private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (filters.size() > 0) { for (int i = filters.size() - 1; i >= 0; i --) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { ... public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } ... }; } } return last; } |
这里会将原来的Invoker通过各种Filter包装成一个InvokerChain,一次调用会依次经过这些FilterChain到达最终的Filter。在这些Filter中可以进行超时校验、数据监控等工作,每个Filter相对独立,使得代码结构非常清晰,也便于为新功能进行扩展。
这个链的结构是:除了初始传入的Invoker外,对于每个Filter都新建一个Invoker,并返回最后一个创建的Invoker。当执行这些后来构建的Invoker.invoker()方法时,实际调用了filter.invoker(next, invocation),这样会去执行filter中的逻辑,然后再由filter调用下一个Invoker的invoke方法。直至最后一个原始的Invoker,它的invoke方法不会调用filter,而是正常的invoke逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
invokerN.invoke()开始 -> filterN.invoke()开始 -> ... -> invoker1.invoke()开始 -> filter1.invoke()开始 -> invoker0.invoke()开始 -> invoker0.invoke()结束 -> filter1.invoke()结束 -> invoker1.invoke()结束 -> ... -> filterN.invoke()结束 -> invokerN.invoke()结束 |
以TimeoutFilter为例具体来看一下
1 2 3 4 5 6 7 8 9 10 11 12 13 |
@Activate(group = Constants.PROVIDER) public class TimeoutFilter implements Filter { public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { long start = System.currentTimeMillis(); Result result = invoker.invoke(invocation); long elapsed = System.currentTimeMillis() - start; if (invoker.getUrl() != null && elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", Integer.MAX_VALUE)) { // log timeout info } return result; } } |
在调用下一个Invoker的前后记录时间,并将超时的信息打印出来。其中原始的Invoker来自JavassistProxyFactory创建的实例
1 2 3 4 5 6 7 8 9 10 11 |
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } |
基类的invoke方法调用子类的doInvoke,去执行真正的反射调用。Server端处理一次请求的流程就介绍到这里。
文中提到的核心类包括
- NettyServer
- NettyHandler
- HeaderExchangeHandler
- DubboProtocol
- ProtocolFilterWrapper
- JavassistProxyFactory
© 2015, 高飞航.cn. 版权所有.