由上一篇分析可知,在 broker 节点启动过程中会创建一个 SocketServer 类型的对象,并调用其 SocketServer#startup
方法执行组件的启动过程。SocketServer 是 Kafka 对外提供网络服务的核心实现类,在 Kafka 运行过程中用于接收来自客户端和其它 broker 节点的网络请求。考虑到性能上的需求,SocketServer 采用了 Reactor 模式,并基于 java NIO 实现。
参考如下示意图,Kafka 为 broker 所在宿主机的每一张网卡创建并绑定了一个 Acceptor 组件,用于接收并处理所有的连接请求;每个 Acceptor 组件维护多个 Processor 线程,其中每个 Processor 拥有专属的 Selector,用于从连接中读取请求和写回响应;每个 Acceptor 组件同时维护多个 Handler 线程,用于处理请求并生成响应传递给 Processor,而 Handler 与 Processor 之间通过请求队列进行通信。
SocketServer 组件
SocketServer 是整个 kafka server 网络模型的管家类,主要用于构建和启动整个网络模块。SocketServer 类的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class SocketServer (val config: KafkaConfig , val metrics: Metrics , val time: Time , val credentialProvider: CredentialProvider ) extends Logging with KafkaMetricsGroup { private val endpoints: Map [ListenerName , EndPoint ] = config.listeners.map(l => l.listenerName -> l).toMap private val numProcessorThreads = config.numNetworkThreads private val totalProcessorThreads = numProcessorThreads * endpoints.size private val maxQueuedRequests = config.queuedMaxRequests private val maxConnectionsPerIp = config.maxConnectionsPerIp private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides val requestChannel = new RequestChannel (totalProcessorThreads, maxQueuedRequests) private [network] val acceptors = mutable.Map [EndPoint , Acceptor ]() private val processors = new Array [Processor ](totalProcessorThreads) private var connectionQuotas: ConnectionQuotas = _ }
各字段的含义参考注释,其中 EndPoint 类用于封装服务器对应的 host、port,以及网络协议等信息,而 RequestChannel 类定义了 Processor 和 Handler 之间交换数据的通道,该类的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class RequestChannel (val numProcessors: Int , // Processor 线程总数 val queueSize: Int // 请求队列的大小 ) extends KafkaMetricsGroup { private var responseListeners: List [Int => Unit ] = Nil private val requestQueue = new ArrayBlockingQueue [RequestChannel .Request ](queueSize) private val responseQueues = new Array [BlockingQueue [RequestChannel .Response ]](numProcessors) }
RequestChannel 封装了请求队列和响应队列,这里需要注意的一点是请求队列是 Processor 线程共享的,而响应队列则是每个 Processor 线程专属的。Processor 负责将读取到的请求写入请求队列中,并从自己的响应队列中取出响应对象发送给请求方。Handler 负责从请求队列中读取请求进行处理,并在处理完成之后将响应对象写入到之前读取该请求的 Processor 的响应队列中。关于 Acceptor、Processor 和 Handler 的实现下文会专门进行分析,这里我们先来看一下 SocketServer 的启动逻辑,位于 SocketServer#startup
方法中,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 def startup () { synchronized { connectionQuotas = new ConnectionQuotas (maxConnectionsPerIp, maxConnectionsPerIpOverrides) val sendBufferSize = config.socketSendBufferBytes val recvBufferSize = config.socketReceiveBufferBytes val brokerId = config.brokerId var processorBeginIndex = 0 config.listeners.foreach { endpoint => val listenerName = endpoint.listenerName val securityProtocol = endpoint.securityProtocol val processorEndIndex = processorBeginIndex + numProcessorThreads for (i <- processorBeginIndex until processorEndIndex) processors(i) = this .newProcessor(i, connectionQuotas, listenerName, securityProtocol) val acceptor = new Acceptor (endpoint, sendBufferSize, recvBufferSize, brokerId, processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas) acceptors.put(endpoint, acceptor) Utils .newThread(s"kafka-socket-acceptor-$listenerName -$securityProtocol -${endpoint.port} " , acceptor, false ).start() acceptor.awaitStartup() processorBeginIndex = processorEndIndex } } info("Started " + acceptors.size + " acceptor threads" ) }
SocketServer 启动过程中会遍历为当前 broker 节点上的每张网卡创建并绑定对应 Acceptor 对象,然后按照配置的 Processor 线程数(对应 num.network.threads
配置)为每个 Acceptor 创建并绑定对应数量的 Processor 实例,最后启动 Acceptor 线程。
Acceptor 组件
Acceptor 主要负责接收来自客户端和其它 broker 节点的请求,并创建对应的 socket 连接交由 Processor 进行处理。Acceptor 类的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private [kafka] class Acceptor (val endPoint: EndPoint , // 对应的网卡信息 val sendBufferSize: Int , // socket send buffer size val recvBufferSize: Int , // socket receive buffer size brokerId: Int , // broker 节点 id processors: Array [Processor ], // 绑定的 Processor 线程集合 connectionQuotas: ConnectionQuotas // 控制 IP 连接数的对象 ) extends AbstractServerThread (connectionQuotas ) with KafkaMetricsGroup { private val nioSelector = NSelector .open() val serverChannel: ServerSocketChannel = this .openServerSocket(endPoint.host, endPoint.port) }
SocketServer 在启动过程中会创建并启动 Acceptor 线程,由上面的定义可以看出 Acceptor 继承自 AbstractServerThread 抽象类,而 AbstractServerThread 实现了 Runnable 接口,并提供了对线程的基本管理方法。Acceptor 的具体执行逻辑位于 Acceptor#run
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 def run () { serverChannel.register(nioSelector, SelectionKey .OP_ACCEPT ) this .startupComplete() try { var currentProcessor = 0 while (isRunning) { try { val ready = nioSelector.select(500 ) if (ready > 0 ) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() if (key.isAcceptable) this .accept(key, processors(currentProcessor)) else throw new IllegalStateException ("Unrecognized key state for acceptor thread." ) currentProcessor = (currentProcessor + 1 ) % processors.length } catch { case e: Throwable => error("Error while accepting connection" , e) } } } } catch { case e: ControlThrowable => throw e case e: Throwable => error("Error occurred" , e) } } } finally { debug("Closing server socket and selector." ) this .swallowError(serverChannel.close()) this .swallowError(nioSelector.close()) this .shutdownComplete() } } def accept (key: SelectionKey , processor: Processor ) { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel ] val socketChannel = serverSocketChannel.accept() try { connectionQuotas.inc(socketChannel.socket().getInetAddress) socketChannel.configureBlocking(false ) socketChannel.socket().setTcpNoDelay(true ) socketChannel.socket().setKeepAlive(true ) if (sendBufferSize != Selectable .USE_DEFAULT_BUFFER_SIZE ) socketChannel.socket().setSendBufferSize(sendBufferSize) processor.accept(socketChannel) } catch { case e: TooManyConnectionsException => info("Rejected connection from %s, address already has the configured maximum of %d connections." .format(e.ip, e.count)) this .close(socketChannel) } }
上述方法的执行逻辑是一个典型的 NIO server 的实现。Acceptor 会循环监听 OP_ACCEPT
事件,当有新的连接请求到达时会创建并配置连接对应的 SocketChannel 对象,并交由 Processor 处理(调用 Processor#accept
方法)。我们知道一个 Acceptor 上绑定了多个 Processor 线程,为了保证各个 Processor 的负载均衡,这里使用了简单的轮询算法,逐个选择 Processor 线程处理请求。
对于新进来的请求,Acceptor 首先会使用 ConnectionQuotas 对象管理请求 IP 上的连接数,并在连接数超过配置的阈值(默认对应 max.connections.per.ip
配置,可以通过 max.connections.per.ip.overrides
配置覆盖默认配置)时触发限流机制,关闭当前连接的通道。
Processor 组件
Processor 主要负责读取来自请求方的请求,并向请求方发送响应,但是本身不负责对请求进行处理,而是委托给相应的 Handler 线程进行处理。Processor 中几个重要的字段定义如下:
1 2 3 4 5 6 val requestChannel: RequestChannel private val newConnections = new ConcurrentLinkedQueue [SocketChannel ]()private val inflightResponses = mutable.Map [String , RequestChannel .Response ]()
Acceptor 线程在收到连接请求之后会将请求封装成 SocketChannel 对象,并调用 Processor#accept
方法将其分配给对应的 Processor 线程进行处理,该对象会被记录到 Processor#newConnections
字段中,并唤醒对应的 Processor 线程。方法 Processor#accept
的实现如下:
1 2 3 4 5 6 def accept (socketChannel: SocketChannel ) { newConnections.add(socketChannel) this .wakeup() }
Processor 同样继承了 AbstractServerThread 抽象类,所以也是一个线程类实现。在创建 Acceptor 对象的过程中会遍历启动分配给当前 Acceptor 的 Processor 线程。
1 2 3 4 5 6 7 8 synchronized { processors.foreach { processor => Utils .newThread( s"kafka-network-thread-$brokerId -${endPoint.listenerName} -${endPoint.securityProtocol} -${processor.id} " , processor, false ).start() } }
Processor 的 Processor#run
方法在线程启动之后会一直循环处理 Acceptor 分配的请求,读取并封装请求数据到队列中,然后等待 Handler 线程处理。对于已经处理完成的请求对应的响应对象,Processor 线程会依据响应类型分而治之。方法 Processor#run
的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 override def run () { this .startupComplete() while (isRunning) { try { this .configureNewConnections() this .processNewResponses() this .poll() this .processCompletedReceives() this .processCompletedSends() this .processDisconnected() } catch { case e: ControlThrowable => throw e case e: Throwable => error("Processor got uncaught exception." , e) } } debug("Closing selector - processor " + id) this .swallowError(closeAll()) this .shutdownComplete() }
当 Processor 线程启动完成后会调用 Processor#startupComplete
方法标识当前线程启动完成,然后开始进入循环,依次执行以下操作:
遍历处理 Acceptor 分配给当前 Processor 的 SocketChannel 对象,注册 OP_READ
事件读取请求数据;
遍历处理 Processor 自己的响应队列,按照响应类型分别处理;
发送缓存的响应给请求方,并将读取到的请求、已经发送成功的请求,以及断开的连接分别放置到 Selector 的 completedReceives、completedSends 和 disconnected 集合中;
处理 Selector 的 completedReceives 集合,封装请求数据到请求队列中,等待 Handler 线程处理;
处理 Selector 的 completedSends 集合,将已经发送成功的响应从本地 inflightResponses 集合中移除;
处理 Selector 的 disconnected 集合,将已经断开的连接上的响应从 inflightResponses 集合中移除。
下面对各个步骤逐一进行深入分析,首先来看 步骤 1 ,实现位于 Processor#configureNewConnections
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private def configureNewConnections () { while (!newConnections.isEmpty) { val channel = newConnections.poll() try { debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress} " ) val localHost = channel.socket().getLocalAddress.getHostAddress val localPort = channel.socket().getLocalPort val remoteHost = channel.socket().getInetAddress.getHostAddress val remotePort = channel.socket().getPort val connectionId = ConnectionId (localHost, localPort, remoteHost, remotePort).toString selector.register(connectionId, channel) } catch { case NonFatal (e) => val remoteAddress = channel.getRemoteAddress this .close(channel) error(s"Processor $id closed connection from $remoteAddress " , e) } } }
前面我们曾介绍过 Acceptor 会将请求对应的 SocketChannel 对象记录到 Processor#newConnections
字段中,而这一步的主要任务就是遍历处理这些 SocketChannel 对象,分别将 Processor 对应的 Selector 注册到这些通道上(对应 OP_READ
事件),用于读取请求数据。
步骤 2 会遍历消费当前 Processor 的响应队列,按照响应的类型分别处理,实现位于 Processor#processNewResponses
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private def processNewResponses () { var curr = requestChannel.receiveResponse(id) while (curr != null ) { try { curr.responseAction match { case RequestChannel .NoOpAction => curr.request.updateRequestMetrics() trace("Socket server received empty response to send, registering for read: " + curr) val channelId = curr.request.connectionId if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null ) selector.unmute(channelId) case RequestChannel .SendAction => this .sendResponse(curr) case RequestChannel .CloseConnectionAction => curr.request.updateRequestMetrics() trace("Closing socket connection actively according to the response code." ) this .close(selector, curr.request.connectionId) } } finally { curr = requestChannel.receiveResponse(id) } } }
我们知道 Processor 本身不负责处理请求,它只是封装请求交由 Handler 线程进行处理,同时每一个 Processor 会维护一个响应队列,Handler 线程在处理完请求之后会将对应的响应对象放置到对应 Processor 的响应队列中,而这一步会遍历处理该响应队列,并依据响应类型分而治之:
如果当前没有响应需要处理,那么会重新在对应的通道上注册 OP_READ
事件,以继续读取新的请求数据。
如果当前的响应需要发送给请求方,则会调用 Processor#sendResponse
方法发送响应,并将响应对象记录到 Processor#inflightResponses
字段中,表示该响应对象正在被发送。
如果当前的响应类型表示需要关闭对应的连接,则会调用 Processor#close
方法关闭对应的通道,并更新对应 IP 上的连接数。
步骤 3 会发送步骤 2 缓存的响应请求,并将读取到的请求、已经发送成功的请求,以及断开的连接分别放置到 Selector 的 completedReceives、completedSends 和 disconnected 集合中,而步骤 4 至 6 的逻辑则分别对应处理这 3 个集合。首先看一下 步骤 4 ,相应实现位于 Processor#processCompletedReceives
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private def processCompletedReceives () { selector.completedReceives.asScala.foreach { receive => try { val openChannel = selector.channel(receive.source) val session = { val channel = if (openChannel != null ) openChannel else selector.closingChannel(receive.source) RequestChannel .Session (new KafkaPrincipal (KafkaPrincipal .USER_TYPE , channel.principal.getName), channel.socketAddress) } val req = RequestChannel .Request ( processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, listenerName = listenerName, securityProtocol = securityProtocol) requestChannel.sendRequest(req) selector.mute(receive.source) } catch { case e@(_: InvalidRequestException | _: SchemaException ) => error(s"Closing socket for ${receive.source} because of error" , e) close(selector, receive.source) } } }
这一步会遍历处理 Selector 的 completedReceives 集合,对于收到的请求对象会读取请求数据,并封装成 Request 对象记录到请求队列 Processor#requestChannel
中,等待 Handler 线程处理,同时取消之前注册到对应通道的 OP_READ
事件,在处理完成之前不再读取新的请求数据。这里调用了 RequestChannel#sendRequest
方法将 Request 对象放置到一个被 Processor 共享的请求队列中,后续 Handler 线程会消费该队列处理对应的请求。
步骤 5 会遍历处理 Selector 的 completedSends 集合,其中存放了已经发送成功的响应,对于这些响应可以从 Processor#inflightResponses
中移除,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 private def processCompletedSends () { selector.completedSends.asScala.foreach { send => val resp = inflightResponses.remove(send.destination).getOrElse { throw new IllegalStateException (s"Send for ${send.destination} completed, but not in `inflightResponses`" ) } resp.request.updateRequestMetrics() selector.unmute(send.destination) } }
步骤 6 会遍历处理 Selector 的 disconnected 集合,对于已经断开的连接,将本地记录的待发送完成的响应对象从 Processor#inflightResponses
中移除,同时更新对应 IP 上的连接数,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 private def processDisconnected () { selector.disconnected.asScala.foreach { connectionId => val remoteHost = ConnectionId .fromString(connectionId).getOrElse { throw new IllegalStateException (s"connectionId has unexpected format: $connectionId " ) }.remoteHost inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics()) connectionQuotas.dec(InetAddress .getByName(remoteHost)) } }
Handler 组件
Processor 在将对应的 Request 请求对象记录到全局共享的请求队列之后,Handler 线程会消费该队列并处理对应的请求,同时将处理完成的请求对应的响应对象写入到之前读取该请求的 Processor 的响应队列中。Handler 的实现由 KafkaRequestHandler 和 KafkaRequestHandlerPool 两个类构成,其中 KafkaRequestHandlerPool 是对 KafkaRequestHandler 的封装,提供了对 Handler 线程的管理。KafkaRequestHandlerPool 的实现比较简单,我们主要来看一下 KafkaRequestHandler 的实现。KafkaRequestHandler 实现了 Runnable 接口,其 KafkaRequestHandler#run
方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 override def run () { while (true ) { try { var req: RequestChannel .Request = null while (req == null ) { val startSelectTime = time.nanoseconds req = requestChannel.receiveRequest(300 ) val idleTime = time.nanoseconds - startSelectTime aggregateIdleMeter.mark(idleTime / totalHandlerThreads) } if (req eq RequestChannel .AllDone ) { debug("Kafka request handler %d on broker %d received shut down command" .format(id, brokerId)) return } req.requestDequeueTimeMs = time.milliseconds trace("Kafka request handler %d on broker %d handling request %s" .format(id, brokerId, req)) apis.handle(req) } catch { case e: Throwable => error("Exception when handling request" , e) } } }
上述方法诠释了 Handler 的全部运行逻辑,首先调用 RequestChannel#receiveRequest
方法超时等待从全局请求队列中获取请求对象,如果获取到的请求对象是 RequestChannel.AllDone
类型,则说明当前请求退出相应线程,否则 Handler 线程会调用 KafkaApis#handle
方法对请求进行处理,并将响应结果写入到对应 Processor 的响应队列中。
KafkaApis 类是 Kafka 中的一个核心类实现,用于分发各种类型的请求给到相应的组件,针对每一种请求都定义了相应的方法进行处理,上面调用 KafkaApis#handle
方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 def handle (request: RequestChannel .Request ) { try { trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s" . format(request.requestDesc(true ), request.connectionId, request.securityProtocol, request.session.principal)) ApiKeys .forId(request.requestId) match { case ApiKeys .PRODUCE => handleProducerRequest(request) case ApiKeys .FETCH => handleFetchRequest(request) case ApiKeys .LIST_OFFSETS => handleOffsetRequest(request) case ApiKeys .METADATA => handleTopicMetadataRequest(request) case ApiKeys .LEADER_AND_ISR => handleLeaderAndIsrRequest(request) case ApiKeys .STOP_REPLICA => handleStopReplicaRequest(request) case ApiKeys .UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request) case ApiKeys .CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request) case ApiKeys .OFFSET_COMMIT => handleOffsetCommitRequest(request) case ApiKeys .OFFSET_FETCH => handleOffsetFetchRequest(request) case ApiKeys .GROUP_COORDINATOR => handleGroupCoordinatorRequest(request) case ApiKeys .JOIN_GROUP => handleJoinGroupRequest(request) case ApiKeys .HEARTBEAT => handleHeartbeatRequest(request) case ApiKeys .LEAVE_GROUP => handleLeaveGroupRequest(request) case ApiKeys .SYNC_GROUP => handleSyncGroupRequest(request) case ApiKeys .DESCRIBE_GROUPS => handleDescribeGroupRequest(request) case ApiKeys .LIST_GROUPS => handleListGroupsRequest(request) case ApiKeys .SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case ApiKeys .API_VERSIONS => handleApiVersionsRequest(request) case ApiKeys .CREATE_TOPICS => handleCreateTopicsRequest(request) case ApiKeys .DELETE_TOPICS => handleDeleteTopicsRequest(request) case requestId => throw new KafkaException ("Unknown api code " + requestId) } } catch { } finally request.apiLocalCompleteTimeMs = time.milliseconds }
枚举类 ApiKeys 为每一种请求类型定义了一个唯一的标识,KafkaApis 会依据具体的请求类型,将请求委托给对应的 handle*
方法进行处理,这些方法基本的执行逻辑可以概括为:
解析获取相应类型的请求对象;
权限校验;
委托对应的组件处理请求;
发送响应,或定义响应回调函数,并由具体的组件回调执行。
相应的实现这里先不展开,后续分析具体组件时再针对性介绍。
总结
本文我们分析了 Kafka 的网络交互模型设计与实现,考虑到客户端与集群之间,以及 broker 节点之间的交互均基于请求进行通信,所以必须保证网络交互这一块的低延迟和高性能。相比于传统的“thread-per-connection”线程模型,Kafka 采用了 reactor 模式以满足实际的需求,并借助于 java NIO 进行实现。整个网络交互模型主要分为 Acceptor、Processor 和 Handler 三大组件,其中 Acceptor 负责接收请求,Processor 负责解析请求并发送响应,而具体的请求处理过程则交由 Handler 负责,其中的设计思想值得我们在开发自己的项目中借鉴。