Kafka 源码解析:Broker 节点的启动与关闭
从本篇开始我们分析 Kafka 服务端组件的实现。Kafka 集群由多个 broker 节点构成,每个节点上都运行着一个 Kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。本篇我们主要分析单个 broker 节点上 Kafka 实例的启动和关闭过程,关于集群整体的协调运行机制将在后面按照组件逐一进行分析。
Kafka 提供了 kafka-server-start.sh
脚本来简化服务的启动操作,脚本中通过调用 kafka.Kafka
类来启动 Kafka 服务,这也是 Kafka 整个服务端的驱动类。在 Kafka 服务启动过程中,首先会解析并封装命令行传递的参数,然后创建负责 Kafka 服务启动和关闭操作的 KafkaServerStartable 类对象,并调用 KafkaServerStartable#startup
方法启动服务。
Kafka 驱动类的 main 方法实现如下:
1 | def main(args: Array[String]): Unit = { |
KafkaServerStartable 实际只是对 KafkaServer 的简单封装,相应方法实现都只是简单调用了 KafkaServer 类中同名的方法,所以下文我们主要分析 KafkaServer 类的实现。KafkaServer 是对单个 broker 节点生命周期的描绘,其主要逻辑是用来启动和关闭单个 broker 节点,KafkaServer 类字段定义如下:
1 | class KafkaServer(val config: KafkaConfig, // 配置信息对象 |
在开始分析 KafkaServer 的启动和关闭逻辑之前,我们首先看一下最简单的 KafkaServer#awaitShutdown
方法实现。在 KafkaServer 中定义了一个 CountDownLatch 类型的 KafkaServer#shutdownLatch
字段,初始 count 值设置为 1,而 KafkaServer#awaitShutdown
方法只是简单的调用了 CountDownLatch#await
方法来阻塞主线程。当 KafkaServer#shutdown
方法执行完成后会调用 CountDownLatch#countDown
方法将 count 值设置为 0,从而让主线程从阻塞态中恢复,并最终关闭整个服务。
服务启动过程分析
方法 KafkaServer#shutdown
的实现我们稍后进行分析,下面首先看一下 Kafka 服务的启动过程,即 KafkaServer#startup
方法的实现。该方法实现较长,这里先对方法的整体执行流程进行概括,然后挑一些重点的步骤做进一步分析:
- 运行状态校验,如果当前 broker 节点正在执行关闭操作,则此时不允许再次启动服务,所以抛出异常;如果当前服务已经启动完成,即处于运行状态,则直接返回,不需要重复启动;否则设置正在启动标记;
- 设置当前 broker 节点的状态为 Starting,标识 broker 节点正在启动;
- 初始化定时任务调度器 KafkaScheduler;
- 创建 ZkUtils 工具类对象,用于操作 ZK,期间会在 ZK 上创建一些基本的节点;
- 从 ZK 上获取当前 broker 所属集群的 clusterId,如果不存在则创建一个;
- 获取当前 broker 节点的 brokerId;
- 初始化一些监控相关的配置;
- 创建并启动 LogManager,用于管理记录在本地的日志数据;
- 创建 MetadataCache 对象,用于为当前 broker 节点缓存整个集群中全部分区的状态信息;
- 创建并启动 SocketServer,用于接收并处理来自客户端和其它 broker 节点的请求;
- 创建并启动 ReplicaManager,用于管理当前 broker 节点上的分区副本信息;
- 创建并启动 KafkaController,每个 broker 节点都会创建并启动一个 KafkaController 实例,但是只有一个 broker 会成为 leader 角色,负责管理集群中所有的分区和副本的状态,也是集群与 ZK 进行交互的媒介;
- 创建并启动 GroupCoordinator,负责管理分配给当前 broker 节点的消费者 group 的一个子集;
- 创建并初始化 Authorizer 对象,用于权限管理;
- 创建 KafkaApis 对象,用于分发接收到的各种类型请求;
- 创建 KafkaRequestHandlerPool 线程池对象,用于管理所有 KafkaRequestHandler 线程;
- 创建并启动动态配置管理器,用于监听 ZK 的变更;
- 将自己的 brokerId 注册到 ZK 中(
/brokers/ids/{brokerId}
路径,临时节点),用于标记当前 broker 节点是否存活; - 设置当前 broker 节点的状态为 RunningAsBroker,表示当前 broker 节点已经启动完成,可以对外提供服务;
- 更新相关状态标记,标识当前节点的 Kafka 服务启动完成。
下面针对上述流程中的 2、3、4 和 6 几个步骤做进一步说明,对于流程中涉及到的相关类(LogManager、SocketServer、ReplicaManager、KafkaController,以及 GroupCoordinator 等)的实例化和启动的过程会在后续的文章中针对性的分析。
首先来看一下 步骤 2 ,这一步本身的逻辑比较简单,就是将当前 broker 节点的状态设置为 Starting,标识当前 broker 节点正在执行启动操作。我们主要来看一下 broker 节点的状态定义和状态转换,Kafka 为 broker 节点定义了 6 种状态,如下:
1 | sealed trait BrokerStates { def state: Byte } |
关于每种状态的解释和状态转换图如下:
- NotRunning :初始状态,标识当前 broker 节点未运行。
- Starting :标识当前 broker 节点正在启动中。
- RecoveringFromUncleanShutdown :标识当前 broker 节点正在从上次非正常关闭中恢复。
- RunningAsBroker :标识当前 broker 节点启动成功,可以对外提供服务。
- PendingControlledShutdown :标识当前 broker 节点正在等待 controlled shutdown 操作完成。
- BrokerShuttingDown :标识当前 broker 节点正在执行 shutdown 操作。
所谓 controlled shutdown,实际上是 Kafka 提供的一种友好的关闭 broker 节点的机制。除了因为硬件等原因导致的节点非正常关闭,一些场景下管理员也需要通过命令行发送 ControlledShutdownRequest 请求来主动关闭指定的 broker 节点,例如迁移机房、升级软件,修改 Kafka 配置等。关于 controlled shutdown 机制,我们将在后面分析 KafkaController 组件时再展开分析。
下面继续来看一下 步骤 3 ,KafkaScheduler 是一个基于 ScheduledThreadPoolExecutor 的定时任务调度器实现,实现了 Scheduler 特质:
1 | trait Scheduler { |
其中 startup 和 shutdown 方法分别用于启动和关闭调度器,而 isStarted 方法用于检测当前调度器是否已经启动,方法 schedule 用于注册需要进行周期性调度的任务。
步骤 4 调用了 KafkaServer#initZk
方法创建 ZkUtils 对象,ZkUtils 是对 zkclient 的封装,用于操作 ZK。方法 KafkaServer#initZk
会基于 zookeeper.connect
配置获取对应的 ZK 连接,并在 ZK 上创建一些基本的节点。主要的 ZK 节点包括:
/brokers/ids/{id}
: 记录集群中可用的 broker 的 ID。/brokers/topics/{topic}/partitions
: 记录一个 topic 中所有分区的分配信息,以及 AR 集合。/brokers/topics/{topic}/partitions/{partition_id}/state
: 记录分区 leader 副本所在的 broker 节点 ID、年代信息、ISR 集合,以及 zkVersion 等。/controller
: 记录集群 controller leader 所在 broker 节点的 ID。/controller_epoch
: 记录集群 controller leader 的年代信息。/admin/reassign_partitions
: 记录需要执行副本重新分配的分区。/admin/preferred_replica_election
: 记录需要进行优先副本选举的分区,优先副本是在创建分区时指定的第一个副本。/admin/delete_topics
: 记录待删除的 topic 集合。/isr_change_notification
: 记录一段时间内 ISR 集合发生变化的分区。/config
: 记录一些配置信息。
最后来看一下 步骤 6 获取当前 broker 节点的 brokerId 的过程。我们在启动 Kafka 服务之前可以在配置中通过 broker.id
配置项为当前 broker 节点设置全局唯一的 ID,也可以指定让 Kafka 自动生成。解析 brokerId 的过程位于 KafkaServer#getBrokerId
方法中,实现如下:
1 | private def getBrokerId: Int = { |
在 broker 节点的每个 log 目录下有一个 meta.properties
文件,记录了当前 broker 节点的 ID 和版本信息。如果当前 broker 节点不是第一次启动,那么 Kafka 可以通过该文件约束 broker.id
配置需要前后保持一致。此外,Kafka 还通过该文件保证一个 log 目录不被多个 broker 节点共享。
服务关闭过程分析
Broker 节点在关闭对应的 Kafka 服务时,首先会设置状态为 BrokerShuttingDown,表示正在执行关闭操作,然后开始关闭注册的相关组件,并在这些组件全部关闭成功之后,更新 broker 状态为 NotRunning。相关实现位于 KafkaServer#shutdown
中:
1 | def shutdown() { |
整体执行流程如代码注释,比较简单,相关组件的关闭逻辑我们将在后续文章分析具体组件时再进行介绍。
总结
本文我们主要分析了 Kafka 服务启动和关闭的过程。Kafka 在设计上将各个主要功能模块都拆分成了一个个组件进行实现,服务启动的过程实际上就是实例化并启动各个组件的过程,关闭过程也是如此。到目前为止,我们主要是分析了服务整体启动的执行流程,关于各个组件的启动逻辑,将在后面的文章中分析具体组件时再针对性介绍。