kafka之五-网络模型

kafka server在启动的时候会初始化SocketServer、KafkaApis 和 KafkaRequestHandlerPool这三个对象,它们是kafka网络处理模型的主要组成部分,kafka所有的请求都是通过TCP网络以socket的方式进行通讯的。该网络模型是基于java NIO机制实现的,根据其特点可以简单归纳为1+N+M模型,它与Reactor模式类似,其完整流程图如下:

分别来看下Acceptor,Processor,KafkaRequestHandle这三部分:

  1. 1个Acceptor线程,负责监听socket新的连接请求,注册了了OP_ACCEPT事件,将新的连接按照round robin方式交给对应的Processor线程处理。
  2. N个Processor线程,其中每个Processor都有自己的selector,它会向Acceptor分配的SocketChannel注册相应的OP_READ事件,N的大小可由num.networker.threads配置,默认为3
  3. M个KafkaRequestHandler线程处理请求,并将处理的结果返回给Processor线程对应的response queue,由Processor将处理的结果返回给相应的请求发送者,M的大小可由num.io.threads配置,默认为8

再对着上图说下流程:

  1. Acceptor监听到新连接,然后按round robin的方式交给对应的Processor进行处理。
  2. Processor在SocketChannel上注册OP_READ事件并等待事件发生
  3. 当请求到来时,Processor会将请求放入到一个Request Queue中,这是所有Processor共有的一个队列
  4. KafkaRequestHandler请求从Reqeust Queue中取出,并调用相应KafkaApis进行处理(如果是producer生产请求,则将消息写入到底层的磁盘日志中;如果是fetch请求,则从磁盘或页缓存中读取消息)
  5. 处理的结果会放入到Processor对应的Response Queue中(每个请求都有标识它们来自哪个Processor),Request Queue的数量与Processor数量一致
  6. Processor从对应Response Queue中取出reponse并返回给对应的请求者。

需要说明下:
Request Queue是所有Processor公用的一个队列,而Response Queue则是与Processor一一对的,因为每个Processor监听的SocketChannel并不是同一批,如果公有一个Response Queue,那么这N个Processor的selector就需要监听所有的SocketChannel,而Processor与Response Queue一一对应则Processor相应的selector只需要关注分配给自己的SocketChannel即可。

再补充下Purgatory组件,如下图:

Purgatory组件是用来缓存延时请求(Delayed Request)的,所谓延时请求,就是那些一时未满足条件不能立刻处理的请求,比如设置了acks=all的producer请求,一旦设置了acks=all,那么该请求就必须等待ISR中所有副本都接收了消息后才能返回,此时算是该请求的IO线程就必须等待其他broker的写入结果。当请求不能立刻处理时,它就会暂存在Purgatory中,稍后一旦满足了条件,IO线程会继续处理该请求,并将Response放入对应网络线程的响应队列中。

最后再来一张kafka的结构图:

参考文章:深入浅出kafka原理-4-kafka网络机制原理

作者:步履不停原文地址:https://segmentfault.com/a/1190000042246238

%s 个评论

要回复文章请先登录注册