Kafka 源码解析:网络交互模型

由上一篇分析可知,在 broker 节点启动过程中会创建一个 SocketServer 类型的对象,并调用其 SocketServer#startup 方法执行组件的启动过程。SocketServer 是 Kafka 对外提供网络服务的核心实现类,在 Kafka 运行过程中用于接收来自客户端和其它 broker 节点的网络请求。考虑到性能上的需求,SocketServer 采用了 Reactor 模式,并基于 java NIO 实现。

参考如下示意图,Kafka 为 broker 所在宿主机的每一张网卡创建并绑定了一个 Acceptor 组件,用于接收并处理所有的连接请求;每个 Acceptor 组件维护多个 Processor 线程,其中每个 Processor 拥有专属的 Selector,用于从连接中读取请求和写回响应;每个 Acceptor 组件同时维护多个 Handler 线程,用于处理请求并生成响应传递给 Processor,而 Handler 与 Processor 之间通过请求队列进行通信。

image

SocketServer 组件

SocketServer 是整个 kafka server 网络模型的管家类,主要用于构建和启动整个网络模块。SocketServer 类的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class SocketServer(val config: KafkaConfig,
val metrics: Metrics,
val time: Time,
val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {

/** 封装服务器对应的多张网卡,kafka 可以同时监听这些 IP 和端口,每个 EndPoint 对应一个 Acceptor */
private val endpoints: Map[ListenerName, EndPoint] = config.listeners.map(l => l.listenerName -> l).toMap
/** 每个 Acceptor 对应的 Processor 对应的线程数 */
private val numProcessorThreads = config.numNetworkThreads
/** broker 节点上 Processor 线程总数 */
private val totalProcessorThreads = numProcessorThreads * endpoints.size
/** 请求队列中缓存的最大请求个数 */
private val maxQueuedRequests = config.queuedMaxRequests
/** 每个 IP 允许创建的最大连接数 */
private val maxConnectionsPerIp = config.maxConnectionsPerIp
/** 针对特定 IP 指定的允许创建的最大连接数,会覆盖 maxConnectionsPerIp 配置 */
private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
/** Processor 线程与 Handler 线程之间交换数据的通道 */
val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
/** Acceptor 对象集合,每个 EndPoint 对应一个 Acceptor */
private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
/** Processor 对象集合,封装所有的 Processor 对象 */
private val processors = new Array[Processor](totalProcessorThreads)
/** 用于控制每个 IP 上的最大连接数 */
private var connectionQuotas: ConnectionQuotas = _

// ... 省略方法定义

}

各字段的含义参考注释,其中 EndPoint 类用于封装服务器对应的 host、port,以及网络协议等信息,而 RequestChannel 类定义了 Processor 和 Handler 之间交换数据的通道,该类的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class RequestChannel(val numProcessors: Int, // Processor 线程总数
val queueSize: Int // 请求队列的大小
) extends KafkaMetricsGroup {

/** 响应监听器列表,当 Handler 往响应队列写回响应数据时唤醒对应的 Processor 线程进行处理 */
private var responseListeners: List[Int => Unit] = Nil
/** 请求队列,所有的 Processor 共用一个 */
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
/** 响应队列,每个 Processor 对应一个响应队列 */
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)

// ... 省略方法定义

}

RequestChannel 封装了请求队列和响应队列,这里需要注意的一点是请求队列是 Processor 线程共享的,而响应队列则是每个 Processor 线程专属的。Processor 负责将读取到的请求写入请求队列中,并从自己的响应队列中取出响应对象发送给请求方。Handler 负责从请求队列中读取请求进行处理,并在处理完成之后将响应对象写入到之前读取该请求的 Processor 的响应队列中。关于 Acceptor、Processor 和 Handler 的实现下文会专门进行分析,这里我们先来看一下 SocketServer 的启动逻辑,位于 SocketServer#startup 方法中,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def startup() {
synchronized {

// 创建控制 IP 最大连接数的 ConnectionQuotas 对象
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)

// 指定 socket send buffer 的大小(对应 socket.send.buffer.bytes 配置)
val sendBufferSize = config.socketSendBufferBytes
// 指定 socket receive buffer 的大小(对应 socket.receive.buffer.bytes 配置)
val recvBufferSize = config.socketReceiveBufferBytes
// 获取 broker 节点 ID
val brokerId = config.brokerId

var processorBeginIndex = 0
// 遍历为每个 EndPoint,创建并绑定对应的 Acceptor 和 Processor
config.listeners.foreach { endpoint =>
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
val processorEndIndex = processorBeginIndex + numProcessorThreads

// 按照指定的 processor 线程数,为每个 EndPoint 创建对应数量的 Processor 对象,
// 编号区间 [processorBeginIndex, processorEndIndex)
for (i <- processorBeginIndex until processorEndIndex)
processors(i) = this.newProcessor(i, connectionQuotas, listenerName, securityProtocol)

// 为当前 EndPoint 创建并绑定一个 Acceptor 对象
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
acceptors.put(endpoint, acceptor)

// 启动 Acceptor 线程
Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()

// 主线程等待 Acceptor 线程启动完成
acceptor.awaitStartup()

processorBeginIndex = processorEndIndex
}
}

info("Started " + acceptors.size + " acceptor threads")
}

SocketServer 启动过程中会遍历为当前 broker 节点上的每张网卡创建并绑定对应 Acceptor 对象,然后按照配置的 Processor 线程数(对应 num.network.threads 配置)为每个 Acceptor 创建并绑定对应数量的 Processor 实例,最后启动 Acceptor 线程。

Acceptor 组件

Acceptor 主要负责接收来自客户端和其它 broker 节点的请求,并创建对应的 socket 连接交由 Processor 进行处理。Acceptor 类的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private[kafka] class Acceptor(val endPoint: EndPoint, // 对应的网卡信息
val sendBufferSize: Int, // socket send buffer size
val recvBufferSize: Int, // socket receive buffer size
brokerId: Int, // broker 节点 id
processors: Array[Processor], // 绑定的 Processor 线程集合
connectionQuotas: ConnectionQuotas // 控制 IP 连接数的对象
) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

/** NIO Selector */
private val nioSelector = NSelector.open()
/** ServerSocketChannel 对象,监听对应网卡的指定端口 */
val serverChannel: ServerSocketChannel = this.openServerSocket(endPoint.host, endPoint.port)

// ... 省略方法定义

}

SocketServer 在启动过程中会创建并启动 Acceptor 线程,由上面的定义可以看出 Acceptor 继承自 AbstractServerThread 抽象类,而 AbstractServerThread 实现了 Runnable 接口,并提供了对线程的基本管理方法。Acceptor 的具体执行逻辑位于 Acceptor#run 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def run() {
// 注册监听 OP_ACCEPT 事件
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
// 标记当前线程启动完成,以便 SocketServer 能够继续为其它网卡创建并绑定对应的 Acceptor 线程
this.startupComplete()
try {
var currentProcessor = 0 // 当前生效的 processor 编号
while (isRunning) {
try {
// 等待关注的事件
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
// 遍历处理接收到的请求
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
// 如果是 OP_ACCEPT 事件,则调用 accept 方法进行处理
if (key.isAcceptable)
this.accept(key, processors(currentProcessor))
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
// 基于轮询算法选择下一个 Processor 处理下一次请求,负载均衡
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
} catch {
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally {
debug("Closing server socket and selector.")
this.swallowError(serverChannel.close())
this.swallowError(nioSelector.close())
this.shutdownComplete()
}
}

def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
// 创建 SocketChannel 对象
val socketChannel = serverSocketChannel.accept()
try {
// 增加对应 IP 上的连接数,如果连接数超过阈值,则抛 TooManyConnectionsException 异常
connectionQuotas.inc(socketChannel.socket().getInetAddress)
// 配置 SocketChannel 对象,非阻塞模式
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socketChannel.socket().setSendBufferSize(sendBufferSize)

// 将 SocketChannel 交给 Processor 进行处理
processor.accept(socketChannel)
} catch {
// 连接数过多,关闭当前通道上的连接,并将连接计数减 1
case e: TooManyConnectionsException =>
info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
this.close(socketChannel)
}
}

上述方法的执行逻辑是一个典型的 NIO server 的实现。Acceptor 会循环监听 OP_ACCEPT 事件,当有新的连接请求到达时会创建并配置连接对应的 SocketChannel 对象,并交由 Processor 处理(调用 Processor#accept 方法)。我们知道一个 Acceptor 上绑定了多个 Processor 线程,为了保证各个 Processor 的负载均衡,这里使用了简单的轮询算法,逐个选择 Processor 线程处理请求。

对于新进来的请求,Acceptor 首先会使用 ConnectionQuotas 对象管理请求 IP 上的连接数,并在连接数超过配置的阈值(默认对应 max.connections.per.ip 配置,可以通过 max.connections.per.ip.overrides 配置覆盖默认配置)时触发限流机制,关闭当前连接的通道。

Processor 组件

Processor 主要负责读取来自请求方的请求,并向请求方发送响应,但是本身不负责对请求进行处理,而是委托给相应的 Handler 线程进行处理。Processor 中几个重要的字段定义如下:

1
2
3
4
5
6
/** Processor 与 Handler 线程之间传递请求数据的队列 */
val requestChannel: RequestChannel
/** 记录分配给当前 Processor 的待处理的 SocketChannel 对象 */
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
/** 缓存未发送给客户端的响应,由于客户端不会进行确认,所以服务端在发送成功之后会将其移除 */
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()

Acceptor 线程在收到连接请求之后会将请求封装成 SocketChannel 对象,并调用 Processor#accept 方法将其分配给对应的 Processor 线程进行处理,该对象会被记录到 Processor#newConnections 字段中,并唤醒对应的 Processor 线程。方法 Processor#accept 的实现如下:

1
2
3
4
5
6
def accept(socketChannel: SocketChannel) {
// 将 Acceptor 分配的 SocketChannel 对象缓存到同步队列中
newConnections.add(socketChannel)
// 唤醒 Processor 线程处理队列
this.wakeup() // 本质上调用 NIO Server 的 wakeup 方法
}

Processor 同样继承了 AbstractServerThread 抽象类,所以也是一个线程类实现。在创建 Acceptor 对象的过程中会遍历启动分配给当前 Acceptor 的 Processor 线程。

1
2
3
4
5
6
7
8
synchronized {
// 遍历启动分配给当前 Acceptor 的 Processor 线程
processors.foreach { processor =>
Utils.newThread(
s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
processor, false).start()
}
}

Processor 的 Processor#run 方法在线程启动之后会一直循环处理 Acceptor 分配的请求,读取并封装请求数据到队列中,然后等待 Handler 线程处理。对于已经处理完成的请求对应的响应对象,Processor 线程会依据响应类型分而治之。方法 Processor#run 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
override def run() {
// 标识当前线程启动完成
this.startupComplete()
while (isRunning) {
try {
// 1. 遍历获取分配给当前 Processor 的 SocketChannel 对象,注册 OP_READ 事件
this.configureNewConnections()

// 2. 遍历处理当前 Processor 的响应队列,依据响应类型进行处理
this.processNewResponses()

// 3. 发送缓存的响应对象给客户端
this.poll()

// 4.
// 遍历处理 poll 操作放置在 Selector 的 completedReceives 队列中的请求,
// 封装请求信息为 Request 对象,并记录到请求队列中等待 Handler 线程处理,
// 同时标记当前 Selector 暂时不再接收新的请求
this.processCompletedReceives()

// 5.
// 遍历处理 poll 操作放置在 Selector 的 completedSends 队列中的请求,
// 将其从 inflightResponses 集合中移除,并标记当前 Selector 可以继续读取数据
this.processCompletedSends()

// 6.
// 遍历处理 poll 操作放置在 Selector 的 disconnected 集合中的断开的连接,
// 将连接对应的所有响应从 inflightResponses 中移除,同时更新对应 IP 的连接数
this.processDisconnected()
} catch {
case e: ControlThrowable => throw e
case e: Throwable =>
error("Processor got uncaught exception.", e)
}
}

debug("Closing selector - processor " + id)
// 关闭所有的连接以及选择器
this.swallowError(closeAll())
this.shutdownComplete()
}

当 Processor 线程启动完成后会调用 Processor#startupComplete 方法标识当前线程启动完成,然后开始进入循环,依次执行以下操作:

  1. 遍历处理 Acceptor 分配给当前 Processor 的 SocketChannel 对象,注册 OP_READ 事件读取请求数据;
  2. 遍历处理 Processor 自己的响应队列,按照响应类型分别处理;
  3. 发送缓存的响应给请求方,并将读取到的请求、已经发送成功的请求,以及断开的连接分别放置到 Selector 的 completedReceives、completedSends 和 disconnected 集合中;
  4. 处理 Selector 的 completedReceives 集合,封装请求数据到请求队列中,等待 Handler 线程处理;
  5. 处理 Selector 的 completedSends 集合,将已经发送成功的响应从本地 inflightResponses 集合中移除;
  6. 处理 Selector 的 disconnected 集合,将已经断开的连接上的响应从 inflightResponses 集合中移除。

下面对各个步骤逐一进行深入分析,首先来看 步骤 1 ,实现位于 Processor#configureNewConnections 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private def configureNewConnections() {
while (!newConnections.isEmpty) {
// 获取待处理 SocketChannel 对象
val channel = newConnections.poll()
try {
debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
val localHost = channel.socket().getLocalAddress.getHostAddress
val localPort = channel.socket().getLocalPort
val remoteHost = channel.socket().getInetAddress.getHostAddress
val remotePort = channel.socket().getPort
val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
// 注册 OP_READ 事件
selector.register(connectionId, channel)
} catch {
// 对于不致命的异常,则捕获并关闭对应的通道
case NonFatal(e) =>
val remoteAddress = channel.getRemoteAddress
this.close(channel)
error(s"Processor $id closed connection from $remoteAddress", e)
}
}
}

前面我们曾介绍过 Acceptor 会将请求对应的 SocketChannel 对象记录到 Processor#newConnections 字段中,而这一步的主要任务就是遍历处理这些 SocketChannel 对象,分别将 Processor 对应的 Selector 注册到这些通道上(对应 OP_READ 事件),用于读取请求数据。

步骤 2 会遍历消费当前 Processor 的响应队列,按照响应的类型分别处理,实现位于 Processor#processNewResponses 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private def processNewResponses() {
// 获取当前 Processor 的响应队列
var curr = requestChannel.receiveResponse(id)
while (curr != null) {
try {
// 依据响应类型对响应进行处理
curr.responseAction match {
// 暂时没有响应需要发送,如果对应的通道未被关闭,则继续注册 OP_READ 事件读取请求数据
case RequestChannel.NoOpAction =>
curr.request.updateRequestMetrics()
trace("Socket server received empty response to send, registering for read: " + curr)
val channelId = curr.request.connectionId
if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
selector.unmute(channelId) // 注册 OP_READ 事件
// 当前响应需要发送给请求方
case RequestChannel.SendAction =>
// 发送该响应,并将响应对象记录到 inflightResponses 集合中
this.sendResponse(curr)
// 需要关闭当前连接
case RequestChannel.CloseConnectionAction =>
curr.request.updateRequestMetrics()
trace("Closing socket connection actively according to the response code.")
// 关闭连接
this.close(selector, curr.request.connectionId)
}
} finally {
// 获取下一个待处理的响应
curr = requestChannel.receiveResponse(id)
}
}
}

我们知道 Processor 本身不负责处理请求,它只是封装请求交由 Handler 线程进行处理,同时每一个 Processor 会维护一个响应队列,Handler 线程在处理完请求之后会将对应的响应对象放置到对应 Processor 的响应队列中,而这一步会遍历处理该响应队列,并依据响应类型分而治之:

  1. 如果当前没有响应需要处理,那么会重新在对应的通道上注册 OP_READ 事件,以继续读取新的请求数据。
  2. 如果当前的响应需要发送给请求方,则会调用 Processor#sendResponse 方法发送响应,并将响应对象记录到 Processor#inflightResponses 字段中,表示该响应对象正在被发送。
  3. 如果当前的响应类型表示需要关闭对应的连接,则会调用 Processor#close 方法关闭对应的通道,并更新对应 IP 上的连接数。

步骤 3 会发送步骤 2 缓存的响应请求,并将读取到的请求、已经发送成功的请求,以及断开的连接分别放置到 Selector 的 completedReceives、completedSends 和 disconnected 集合中,而步骤 4 至 6 的逻辑则分别对应处理这 3 个集合。首先看一下 步骤 4 ,相应实现位于 Processor#processCompletedReceives 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private def processCompletedReceives() {
// 遍历处理接收到的请求
selector.completedReceives.asScala.foreach { receive =>
try {
// 获取请求对应的通道
val openChannel = selector.channel(receive.source)
// 创建通道对应的 Session 对象,用于权限控制
val session = {
// Only methods that are safe to call on a disconnected channel should be invoked on 'channel'.
val channel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress)
}
// 封装请求信息为 Request 对象
val req = RequestChannel.Request(
processor = id,
connectionId = receive.source,
session = session,
buffer = receive.payload,
startTimeMs = time.milliseconds,
listenerName = listenerName,
securityProtocol = securityProtocol)
// 将请求对象放入请求队列中,等待 Handler 线程处理
requestChannel.sendRequest(req)
// 取消注册的 OP_READ 事件,处理期间不再接收新的请求(即不读取新的请求数据)
selector.mute(receive.source)
} catch {
case e@(_: InvalidRequestException | _: SchemaException) =>
// note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
error(s"Closing socket for ${receive.source} because of error", e)
close(selector, receive.source)
}
}
}

这一步会遍历处理 Selector 的 completedReceives 集合,对于收到的请求对象会读取请求数据,并封装成 Request 对象记录到请求队列 Processor#requestChannel 中,等待 Handler 线程处理,同时取消之前注册到对应通道的 OP_READ 事件,在处理完成之前不再读取新的请求数据。这里调用了 RequestChannel#sendRequest 方法将 Request 对象放置到一个被 Processor 共享的请求队列中,后续 Handler 线程会消费该队列处理对应的请求。

步骤 5 会遍历处理 Selector 的 completedSends 集合,其中存放了已经发送成功的响应,对于这些响应可以从 Processor#inflightResponses 中移除,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
private def processCompletedSends() {
// 遍历处理已经完全发送出去的请求
selector.completedSends.asScala.foreach { send =>
// 因为当前响应已经发送成功,从 inflightResponses 中移除,不需要客户端确认
val resp = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
}
resp.request.updateRequestMetrics()
// 注册 OP_READ 事件,继续读取请求数据
selector.unmute(send.destination)
}
}

步骤 6 会遍历处理 Selector 的 disconnected 集合,对于已经断开的连接,将本地记录的待发送完成的响应对象从 Processor#inflightResponses 中移除,同时更新对应 IP 上的连接数,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
private def processDisconnected() {
// 遍历处理已经断开的连接
selector.disconnected.asScala.foreach { connectionId =>
val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
}.remoteHost
// 将连接对应的所有响应从 inflightResponses 中移除
inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics())
// 对应的通道已经被关闭,所以需要减少对应 IP 上的连接数
connectionQuotas.dec(InetAddress.getByName(remoteHost))
}
}

Handler 组件

Processor 在将对应的 Request 请求对象记录到全局共享的请求队列之后,Handler 线程会消费该队列并处理对应的请求,同时将处理完成的请求对应的响应对象写入到之前读取该请求的 Processor 的响应队列中。Handler 的实现由 KafkaRequestHandler 和 KafkaRequestHandlerPool 两个类构成,其中 KafkaRequestHandlerPool 是对 KafkaRequestHandler 的封装,提供了对 Handler 线程的管理。KafkaRequestHandlerPool 的实现比较简单,我们主要来看一下 KafkaRequestHandler 的实现。KafkaRequestHandler 实现了 Runnable 接口,其 KafkaRequestHandler#run 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
override def run() {
while (true) {
try {
var req: RequestChannel.Request = null
while (req == null) {
val startSelectTime = time.nanoseconds
// 从请求队列中获取 Processor 封装的请求
req = requestChannel.receiveRequest(300)
val idleTime = time.nanoseconds - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
}

// 如果是 AllDone 请求,则退出当前线程
if (req eq RequestChannel.AllDone) {
debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId))
return
}
req.requestDequeueTimeMs = time.milliseconds
trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
// 处理请求,将响应写回到对应 Processor 的响应队列中,并唤醒 Processor 线程
apis.handle(req)
} catch {
case e: Throwable => error("Exception when handling request", e)
}
}
}

上述方法诠释了 Handler 的全部运行逻辑,首先调用 RequestChannel#receiveRequest 方法超时等待从全局请求队列中获取请求对象,如果获取到的请求对象是 RequestChannel.AllDone 类型,则说明当前请求退出相应线程,否则 Handler 线程会调用 KafkaApis#handle 方法对请求进行处理,并将响应结果写入到对应 Processor 的响应队列中。

KafkaApis 类是 Kafka 中的一个核心类实现,用于分发各种类型的请求给到相应的组件,针对每一种请求都定义了相应的方法进行处理,上面调用 KafkaApis#handle 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def handle(request: RequestChannel.Request) {
try {
trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
// 依据请求类型分发请求
ApiKeys.forId(request.requestId) match {
// 处理 ProduceRequest 请求
case ApiKeys.PRODUCE => handleProducerRequest(request)
// 处理 FetchRequest 请求
case ApiKeys.FETCH => handleFetchRequest(request)
// 处理 ListOffsetRequest 请求
case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
// 处理 MetadataRequest 请求
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
// 处理 LeaderAndIsrRequest 请求
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
// 处理 StopReplicaRequest 请求
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
// 处理 UpdateMetadataRequest 请求
case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
// 处理 ControlledShutdownRequest 请求
case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
// 处理 OffsetCommitRequest 请求
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
// 处理 OffsetFetchRequest 请求
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
// 处理 GroupCoordinatorRequest 请求
case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
// 处理 JoinGroupRequest 请求
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
// 处理 HeartbeatRequest 请求
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
// 处理 LeaveGroupRequest 请求
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
// 处理 SyncGroupRequest 请求
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
// 处理 DescribeGroupsRequest 请求
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
// 处理 ListGroupsRequest 请求
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
// 处理 SaslHandshakeRequest 请求
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
// 处理 ApiVersionsRequest 请求
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
// 处理 CreateTopicsRequest 请求
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
// 处理 DeleteTopicsRequest 请求
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
// ... 省略异常处理
} finally
request.apiLocalCompleteTimeMs = time.milliseconds
}

枚举类 ApiKeys 为每一种请求类型定义了一个唯一的标识,KafkaApis 会依据具体的请求类型,将请求委托给对应的 handle* 方法进行处理,这些方法基本的执行逻辑可以概括为:

  1. 解析获取相应类型的请求对象;
  2. 权限校验;
  3. 委托对应的组件处理请求;
  4. 发送响应,或定义响应回调函数,并由具体的组件回调执行。

相应的实现这里先不展开,后续分析具体组件时再针对性介绍。

总结

本文我们分析了 Kafka 的网络交互模型设计与实现,考虑到客户端与集群之间,以及 broker 节点之间的交互均基于请求进行通信,所以必须保证网络交互这一块的低延迟和高性能。相比于传统的“thread-per-connection”线程模型,Kafka 采用了 reactor 模式以满足实际的需求,并借助于 java NIO 进行实现。整个网络交互模型主要分为 Acceptor、Processor 和 Handler 三大组件,其中 Acceptor 负责接收请求,Processor 负责解析请求并发送响应,而具体的请求处理过程则交由 Handler 负责,其中的设计思想值得我们在开发自己的项目中借鉴。