博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark2.1.0之源码分析——RPC管道初始化
阅读量:6166 次
发布时间:2019-06-21

本文共 2385 字,大约阅读时间需要 7 分钟。

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81197447

提示:阅读本文前最好先阅读:

在一文的代码清单6——创建TransportClient,《一文的代码清单3——对TransportServer初始化的实现中都在管道初始化回调函数中调用了TransportContext的initializePipeline方法,initializePipeline方法(见代码清单1)将调用Netty的API对管道初始化。

代码清单1         管道初始化

public TransportChannelHandler initializePipeline(      SocketChannel channel,      RpcHandler channelRpcHandler) {    try {      TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);      channel.pipeline()        .addLast("encoder", ENCODER)        .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())        .addLast("decoder", DECODER)        .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))        .addLast("handler", channelHandler);      return channelHandler;    } catch (RuntimeException e) {      logger.error("Error while initializing Netty pipeline", e);      throw e;    }  }

根据代码清单1,initializePipeline方法的执行步骤如下:

  1. 调用createChannelHandler方法创建TransportChannelHandler,从createChannelHandler的实现(见代码清单2)中可以看到真正创建TransportClient是在这里发生的。从TransportClient的构造过程看到RpcHandler 与TransportClient毫无关系,TransportClient只使用了TransportResponseHandler。TransportChannelHandler在服务端将代理TransportRequestHandler对请求消息进行处理,并在客户端代理TransportResponseHandler对响应消息进行处理。
  2. 对管道进行设置,这里的ENCODER(即MessageEncoder)派生自Netty的ChannelOutboundHandler接口;DECODER(即MessageDecoder)、TransportChannelHandler以及TransportFrameDecoder(由工具类NettyUtils的静态方法createFrameDecoder创建)派生自Netty的ChannelInboundHandler接口;IdleStateHandler同时实现了ChannelOutboundHandler和ChannelInboundHandler接口。根据Netty的API行为,通过addLast方法注册多个handler时,ChannelInboundHandler按照注册的先后顺序执行;ChannelOutboundHandler按照注册的先后顺序逆序执行,因此在管道两端(无论是服务端还是客户端)处理请求和响应的流程如图1所示。

代码清单2         创建TransportChannelHandler

private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {    TransportResponseHandler responseHandler = new TransportResponseHandler(channel);    TransportClient client = new TransportClient(channel, responseHandler);    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,      rpcHandler);    return new TransportChannelHandler(client, responseHandler, requestHandler,      conf.connectionTimeoutMs(), closeIdleConnections);  }

管道处理请求和响应的流程图

图1       管道处理请求和响应的流程图

 

关于《Spark内核设计的艺术 架构设计与实现》

经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:

 

纸质版售卖链接如下:

京东:

你可能感兴趣的文章