Kafka 集群由一系列的 broker 节点构成,在这些 broker 节点中会选举一个节点成为所有 broker 节点的 leader(称之为 kafka controller),其余的 broker 节点均为 follower 角色。Kafka Controller 负责管理集群中所有 topic 分区和副本的状态,协调集群中所有 broker 节点的运行,同时也负责 Kafka 与 ZK 之间的交互,下文中如果不特殊说明,Kafka Controller 均指代 leader 角色。
        
           
      Kafka 定义了 KafkaController 类来描述 Kafka Controller,KafkaController 类的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class  KafkaController (val config: KafkaConfig , // 配置信息                       zkUtils: ZkUtils , // ZK  交互工具类                       val brokerState: BrokerState , // 描述 broker 节点的状态                       time: Time , // 时间戳工具类                       metrics: Metrics ,                       threadNamePrefix: Option [String ] = None  ) extends  Logging  with  KafkaMetricsGroup           private  var  isRunning = true           val  controllerContext = new  ControllerContext (zkUtils)          val  partitionStateMachine = new  PartitionStateMachine (this )          val  replicaStateMachine = new  ReplicaStateMachine (this )          private  val  controllerElector = new  ZookeeperLeaderElector (         controllerContext, ZkUtils .ControllerPath , onControllerFailover, onControllerResignation, config.brokerId, time)          private  val  autoRebalanceScheduler = new  KafkaScheduler (1 )          var  deleteTopicManager: TopicDeletionManager  = _          val  offlinePartitionSelector = new  OfflinePartitionLeaderSelector (controllerContext, config)     private  val  reassignedPartitionLeaderSelector = new  ReassignedPartitionLeaderSelector (controllerContext)     private  val  preferredReplicaPartitionLeaderSelector = new  PreferredReplicaPartitionLeaderSelector (controllerContext)     private  val  controlledShutdownPartitionLeaderSelector = new  ControlledShutdownLeaderSelector (controllerContext)          private  val  brokerRequestBatch = new  ControllerBrokerRequestBatch (this )          private  val  partitionReassignedListener = new  PartitionsReassignedListener (this )     private  val  preferredReplicaElectionListener = new  PreferredReplicaElectionListener (this )     private  val  isrChangeNotificationListener = new  IsrChangeNotificationListener (this )      } 
在 Kafka 服务启动时,每个 broker 节点都会创建对应的 Kafka Controller 实例,并调用 KafkaController#startup 方法启动运行:
1 2 3 4 5 6 7 8 9 10 11 12 def  startup Unit  = {    inLock(controllerContext.controllerLock) {         info("Controller starting up" )                  this .registerSessionExpirationListener()                  isRunning = true                   controllerElector.startup         info("Controller startup complete" )     } } 
启动过程中会注册 SessionExpirationListener 监听器监听 Kafka Controller 与 ZK 之间的连接状态,并启动故障转移机制,在初始启动时可以借助该机制为集群选择 leader 角色。这里先了解一下启动的流程,关于 ZK 监听机制和故障转移机制,留到下面的小节中针对性分析。
        
           
      ControllerContext 类用于管理 Kafka Controller 的上下文信息,并提供与集群中所有 broker 之间建立连接并通信的功能。ControllerContext 类的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class  ControllerContext (val zkUtils: ZkUtils  )          var  controllerChannelManager: ControllerChannelManager  = _          var  shuttingDownBrokerIds: mutable.Set [Int ] = mutable.Set .empty          var  epoch: Int  = KafkaController .InitialControllerEpoch  - 1           var  epochZkVersion: Int  = KafkaController .InitialControllerEpochZkVersion  - 1           var  allTopics: Set [String ] = Set .empty          var  partitionReplicaAssignment: mutable.Map [TopicAndPartition , Seq [Int ]] = mutable.Map .empty          var  partitionLeadershipInfo: mutable.Map [TopicAndPartition , LeaderIsrAndControllerEpoch ] = mutable.Map .empty          val  partitionsBeingReassigned: mutable.Map [TopicAndPartition , ReassignedPartitionsContext ] = new  mutable.HashMap           val  partitionsUndergoingPreferredReplicaElection: mutable.Set [TopicAndPartition ] = new  mutable.HashSet           private  var  liveBrokersUnderlying: Set [Broker ] = Set .empty          private  var  liveBrokerIdsUnderlying: Set [Int ] = Set .empty      } 
ControllerContext 类提供了对这些字段管理的方法,实现比较简单,不展开分析。
下面我们重点看一下 ControllerChannelManager 类定义,该类用于建立到集群中所有 broker 节点的连接,并与之通信。ControllerChannelManager 类定义了 ControllerChannelManager#brokerStateInfo 字段,用于记录到对应 broker 节点的通讯相关信息。ControllerChannelManager 类在被实例化时会调用 ControllerChannelManager#addNewBroker 方法初始化 ControllerChannelManager#brokerStateInfo 字段,为每个可用的 broker 节点构造一个 ControllerBrokerStateInfo 对象,其中封装了目标 broker 节点信息、网络客户端对象、缓存请求的队列,以及请求发送线程对象,ControllerBrokerStateInfo 样例类定义如下:
1 2 3 4 case  class  ControllerBrokerStateInfo (networkClient: NetworkClient ,                                      brokerNode: Node ,                                      messageQueue: BlockingQueue [QueueItem ],                                      requestSendThread: RequestSendThread  )
方法 ControllerChannelManager#addNewBroker 的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 private  def  addNewBroker Broker ) {         val  messageQueue = new  LinkedBlockingQueue [QueueItem ]     debug("Controller %d trying to connect to broker %d" .format(config.brokerId, broker.id))     val  brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName)     val  brokerNode = new  Node (broker.id, brokerEndPoint.host, brokerEndPoint.port)          val  networkClient = {         val  channelBuilder = ChannelBuilders .clientChannelBuilder(             config.interBrokerSecurityProtocol,             LoginType .SERVER ,             config.values,             config.saslMechanismInterBrokerProtocol,             config.saslInterBrokerHandshakeRequestEnable         )         val  selector = new  Selector (             NetworkReceive .UNLIMITED ,             Selector .NO_IDLE_TIMEOUT_MS ,             metrics,             time,             "controller-channel" ,             Map ("broker-id"  -> broker.id.toString).asJava,             false ,             channelBuilder         )         new  NetworkClient (             selector,             new  ManualMetadataUpdater (Seq (brokerNode).asJava),             config.brokerId.toString,             1 ,             0 ,             Selectable .USE_DEFAULT_BUFFER_SIZE ,             Selectable .USE_DEFAULT_BUFFER_SIZE ,             config.requestTimeoutMs,             time,             false          )     }          val  threadName = threadNamePrefix match  {         case  None  => "Controller-%d-to-broker-%d-send-thread" .format(config.brokerId, broker.id)         case  Some (name) => "%s:Controller-%d-to-broker-%d-send-thread" .format(name, config.brokerId, broker.id)     }     val  requestThread = new  RequestSendThread (         config.brokerId, controllerContext, messageQueue, networkClient, brokerNode, config, time, threadName)     requestThread.setDaemon(false )          brokerStateInfo.put(broker.id, ControllerBrokerStateInfo (networkClient, brokerNode, messageQueue, requestThread)) } 
RequestSendThread 类继承自 ShutdownableThread 抽象类,所以我们重点来看一下 RequestSendThread#doWork 方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 override  def  doWork Unit  = {    def  backoff Unit  = CoreUtils .swallowTrace(Thread .sleep(100 ))          val  QueueItem (apiKey, requestBuilder, callback) = queue.take()     import  NetworkClientBlockingOps ._      var  clientResponse: ClientResponse  = null      try  {         lock synchronized {             var  isSendSuccessful = false               while  (isRunning.get() && !isSendSuccessful) {                                  try  {                                          if  (!brokerReady()) {                         isSendSuccessful = false                          backoff()                      } else  {                                                  val  clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder, time.milliseconds(), true )                         clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)                         isSendSuccessful = true                      }                 } catch  {                                      }             }                          if  (clientResponse != null ) {                                  val  api = ApiKeys .forId(clientResponse.requestHeader.apiKey)                                  if  (api != ApiKeys .LEADER_AND_ISR  && api != ApiKeys .STOP_REPLICA  && api != ApiKeys .UPDATE_METADATA_KEY )                     throw  new  KafkaException (s"Unexpected apiKey received: $apiKey " )                                  val  response = clientResponse.responseBody                 if  (callback != null ) {                     callback(response)                 }             }         }     } catch  {              } } 
RequestSendThread 线程在运行期间会循环消费存放请求的阻塞队列,队列元素的类型为 QueueItem,其中封装了具体的请求类型、请求对象,以及响应回调函数。如果存在待发送的请求对象则会向目标 broker 节点发送请求,并阻塞等待响应,当拿到响应对象之后调用响应函数进行处理。ControllerChannelManager 的发送请求函数 ControllerChannelManager#sendRequest 本质上就是将请求信息封装成 QueueItem 对象,并记录到请求阻塞队列中,然后交由 RequestSendThread 线程异步处理。
        
           
      ControllerBrokerRequestBatch 类实现了向所有可用 broker 节点批量发送 LeaderAndIsrRequest、StopReplicaRequest 和 UpdateMetadataRequest 请求的功能,它定义了 3 个集合分别缓存这 3 类请求的相关信息,并提供了相关方法用于添加待发送的请求信息,以及批量发送请求。ControllerBrokerRequestBatch 类的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class  ControllerBrokerRequestBatch (controller: KafkaController  ) extends  Logging           val  controllerContext: ControllerContext  = controller.controllerContext          val  controllerId: Int  = controller.config.brokerId          val  leaderAndIsrRequestMap = mutable.Map .empty[Int , mutable.Map [TopicPartition , PartitionStateInfo ]]          val  stopReplicaRequestMap = mutable.Map .empty[Int , Seq [StopReplicaRequestInfo ]]          val  updateMetadataRequestBrokerSet = mutable.Set .empty[Int ]          val  updateMetadataRequestPartitionInfoMap = mutable.Map .empty[TopicPartition , PartitionStateInfo ]      } 
ControllerBrokerRequestBatch 提供了 ControllerBrokerRequestBatch#newBatch 方法用于校验缓存相应请求信息的 3 个集合是否为空,如果不为空则抛出异常,表示此时还存在未发送完成的请求,不允许添加新的待发送请求对象。同时,也提供了 ControllerBrokerRequestBatch#clear 方法用于清空这 3 个集合。
下面列出的 3 个方法分别用于往对应集合中添加相应的请求信息:
ControllerBrokerRequestBatch#addLeaderAndIsrRequestForBrokers:用于往 leaderAndIsrRequestMap 集合中添加待发送的 LeaderAndIsrRequest 请求所需的数据,并构造发往所有可用 broker 节点的 UpdateMetadataRequest 请求,缓存到 updateMetadataRequestPartitionInfoMap 集合中等待发送。ControllerBrokerRequestBatch#addStopReplicaRequestForBrokers:用于往 stopReplicaRequestMap 集合中添加待发送的 StopReplicaRequest 请求所需的数据。ControllerBrokerRequestBatch#addUpdateMetadataRequestForBrokers:用于往 updateMetadataRequestPartitionInfoMap 集合中添加 UpdateMetadataRequest 请求所需的数据。 
方法 ControllerBrokerRequestBatch#sendRequestsToBrokers 会遍历处理这 3 个集合,并调用 KafkaController#sendRequest 方法发送请求,底层还是依赖于前面分析过的 RequestSendThread 线程执行异步请求操作。
ControllerBrokerRequestBatch 类中定义的方法在实现上都比较直观,这里不展开分析。
        
           
      PartitionStateMachine 定义了 Kafka Controller 的分区状态机,用于管理集群中分区的状态信息,每个 Kafka Controller 都定义了自己的分区状态机,但只有在当前 Controller 实例成为 leader 角色时才会启动运行名下的状态机。分区状态机使用 PartitionState 特质定义分区的状态,同时提供了多个样例对象实现,分别表示不同的分区状态。分区状态样例对象说明:
NewPartition  :新创建出来的分区对应的状态,此时分区可能已经被分配了 AR 集合,但是还没有分配 ISR 集合和 leader 副本。OnlinePartition  :当一个分区选举出 leader 副本之后,该分区即处于此状态。OfflinePartition  :如果一个分区的 leader 副本失效,则切换成此状态。NonExistentPartition  :描述一个不存在的分区,或者之前存在但是现在已经被删除了。 
分区状态转换图如下:
        
PartitionStateMachine 的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class  PartitionStateMachine (controller: KafkaController  ) extends  Logging           private  val  controllerContext = controller.controllerContext          private  val  controllerId = controller.config.brokerId          private  val  zkUtils = controllerContext.zkUtils          private  val  partitionState: mutable.Map [TopicAndPartition , PartitionState ] = mutable.Map .empty          private  val  brokerRequestBatch = new  ControllerBrokerRequestBatch (controller)          private  val  hasStarted = new  AtomicBoolean (false )          private  val  noOpPartitionLeaderSelector = new  NoOpLeaderSelector (controllerContext)          private  val  topicChangeListener = new  TopicChangeListener (controller)          private  val  deleteTopicsListener = new  DeleteTopicsListener (controller)          private  val  partitionModificationsListeners: mutable.Map [String , PartitionModificationsListener ] = mutable.Map .empty      } 
当 Kafka Controller 实例从 follower 角色选举成为 leader 角色时,会调用 PartitionStateMachine#startup 方法启动对应的分区状态机,该方法实现如下:
1 2 3 4 5 6 7 8 def  startup          this .initializePartitionState()          hasStarted.set(true )          this .triggerOnlinePartitionStateChange() } 
分区状态机使用 PartitionStateMachine#partitionState 字段记录集群中所有可用分区的状态,在启动时会初始化该字段,即初始化每个 topic 分区对应的状态信息,尝试将所有 OfflinePartition 或 NewPartition 状态的可用分区切换成 OnlinePartition 状态。
方法 PartitionStateMachine#initializePartitionState 会遍历集群中所有的 topic 分区,并尝试获取分区对应的 leader 副本和 ISR 集合等信息,如果这些信息不存在则将对应分区初始化为 NewPartition 状态。否则,校验分区 leader 副本所在的 broker 节点是否可用,如果可用则将对应分区初始化为 OnlinePartition 状态,如果不可用则将对应分区初始化为 OfflinePartition 状态。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private  def  initializePartitionState          for  (topicPartition <- controllerContext.partitionReplicaAssignment.keys) {                  controllerContext.partitionLeadershipInfo.get(topicPartition) match  {                          case  Some (currentLeaderIsrAndEpoch) =>                                  if  (controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader))                     partitionState.put(topicPartition, OnlinePartition )                                  else                      partitionState.put(topicPartition, OfflinePartition )                          case  None  =>                 partitionState.put(topicPartition, NewPartition )         }     } } 
方法 PartitionStateMachine#triggerOnlinePartitionStateChange 会遍历所有可用的分区(不包含那些待删除的 topic 名下的分区),并尝试对状态为 OfflinePartition 或 NewPartition 的分区执行状态切换,切换成 OnlinePartition 状态。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def  triggerOnlinePartitionStateChange     try  {                  brokerRequestBatch.newBatch()                  for  ((topicAndPartition, partitionState) <- partitionState              if  !controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic)) {                          if  (partitionState.equals(OfflinePartition ) || partitionState.equals(NewPartition ))                 this .handleStateChange(                     topicAndPartition.topic,                     topicAndPartition.partition,                     OnlinePartition ,                     controller.offlinePartitionSelector,                     (new  CallbackBuilder ).build)         }                  brokerRequestBatch.sendRequestsToBrokers(controller.epoch)     } catch  {              } } 
而具体执行分区状态切换的操作则交由 PartitionStateMachine#handleStateChange 方法完成,关于该方法的实现将在接下来的小节中进行分析。
        
           
      分区状态机定义了 PartitionStateMachine#handleStateChanges 方法用于将指定的 topic 分区集合中的分区状态切换成指定的目标状态,方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def  handleStateChanges Set [TopicAndPartition ],                        targetState: PartitionState ,                         leaderSelector: PartitionLeaderSelector  = noOpPartitionLeaderSelector,                         callbacks: Callbacks  = (new  CallbackBuilder ).build) {     info("Invoking state change to %s for partitions %s" .format(targetState, partitions.mkString("," )))     try  {                  brokerRequestBatch.newBatch()                  partitions.foreach { topicAndPartition =>             this .handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks)         }                  brokerRequestBatch.sendRequestsToBrokers(controller.epoch)     } catch  {              } } 
其中核心实现在于 PartitionStateMachine#handleStateChange 方法,前面分析分区状态机启动过程时也提到了该方法,下面一起来看一下该方法的具体实现(省略了日志打点):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 private  def  handleStateChange String , partition: Int ,                              targetState: PartitionState ,                               leaderSelector: PartitionLeaderSelector ,                                callbacks: Callbacks ) {     val  topicAndPartition = TopicAndPartition (topic, partition)          if  (!hasStarted.get)         throw  new  StateChangeFailedException (("Controller %d epoch %d initiated state change for partition %s to %s failed because "  +                 "the partition state machine has not started" ).format(controllerId, controller.epoch, topicAndPartition, targetState))          val  currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition )     try  {                  targetState match  {             case  NewPartition  =>                                  this .assertValidPreviousStates(topicAndPartition, List (NonExistentPartition ), NewPartition )                                  partitionState.put(topicAndPartition, NewPartition )             case  OnlinePartition  =>                                  this .assertValidPreviousStates(topicAndPartition, List (NewPartition , OnlinePartition , OfflinePartition ), OnlinePartition )                 partitionState(topicAndPartition) match  {                     case  NewPartition  =>                                                  this .initializeLeaderAndIsrForPartition(topicAndPartition)                     case  OfflinePartition  =>                                                  this .electLeaderForPartition(topic, partition, leaderSelector)                     case  OnlinePartition  =>                                                   this .electLeaderForPartition(topic, partition, leaderSelector)                     case  _ =>                  }                                  partitionState.put(topicAndPartition, OnlinePartition )             case  OfflinePartition  =>                                  this .assertValidPreviousStates(topicAndPartition, List (NewPartition , OnlinePartition , OfflinePartition ), OfflinePartition )                                  partitionState.put(topicAndPartition, OfflinePartition )             case  NonExistentPartition  =>                                  this .assertValidPreviousStates(topicAndPartition, List (OfflinePartition ), NonExistentPartition )                                  partitionState.put(topicAndPartition, NonExistentPartition )         }     } catch  {              } } 
分区状态切换的整体实现思路是依据切换的目标状态对当前分区状态执行校验,保证当前分区状态属于合法的目标切换状态的前置状态。方法 PartitionStateMachine#assertValidPreviousStates 实现了前置状态的校验,如果前置状态不合法则会抛出异常。
如果目标切换状态是 NewPartition、OfflinePartition 和 NonExistentPartition 中的一个,则切换的过程比较简单。下面主要来看一下目标状态为 OnlinePartition 的分区状态切换,按照前置状态分为 3 种场景:
如果前置分区状态为 NewPartition,则需要为对应 topic 分区分配 leader 副本和 ISR 集合。 
如果前置分区状态为 OfflinePartition,则需要为对应 topic 分区选举新的 leader 副本。 
如果前置分区状态为 OnlinePartition,则需要为对应 topic 分区重新选举新的 leader 副本。 
 
场景 2 和 3 具体由 PartitionStateMachine#electLeaderForPartition 方法实现,我们将在稍后分析分区 leader 副本选举机制时介绍该方法,这里先来看一下场景 1,具体由 PartitionStateMachine#initializeLeaderAndIsrForPartition 方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private  def  initializeLeaderAndIsrForPartition TopicAndPartition ) {         val  replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)          val  liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))     liveAssignedReplicas.size match  {                  case  0  =>             val  failMsg = "encountered error during state change of partition %s from New to Online, assigned replicas are [%s], live brokers are [%s]. No assigned replica is alive."                      .format(topicAndPartition, replicaAssignment.mkString("," ), controllerContext.liveBrokerIds)             stateChangeLogger.error("Controller %d epoch %d " .format(controllerId, controller.epoch) + failMsg)             throw  new  StateChangeFailedException (failMsg)         case  _ =>             debug("Live assigned replicas for partition %s are: [%s]" .format(topicAndPartition, liveAssignedReplicas))                          val  leader = liveAssignedReplicas.head                          val  leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch (new  LeaderAndIsr (leader, liveAssignedReplicas.toList), controller.epoch)             debug("Initializing leader and isr for partition %s to %s" .format(topicAndPartition, leaderIsrAndControllerEpoch))             try  {                                  zkUtils.createPersistentPath(                     getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),                     zkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))                                  controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)                                  brokerRequestBatch.addLeaderAndIsrRequestForBrokers(                     liveAssignedReplicas, topicAndPartition.topic, topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)             } catch  {                              }     } } 
对于 NewPartition 状态的 topic 分区而言,会从该分区可用的副本中选举第 1 个副本作为 leader 副本,并将所有可用的副本添加到 ISR 集合中,然后将这些信息记录到 ZK 中,同时向对应 broker 节点发送 LeaderAndIsrRequest 请求,以执行分区副本角色切换。
        
           
      分区状态机定义了 PartitionStateMachine#electLeaderForPartition 方法,基于给定的分区 leader 副本选择器对指定 topic 分区执行 leader 副本选择操作。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 def  electLeaderForPartition String , partition: Int , leaderSelector: PartitionLeaderSelector ) {    val  topicAndPartition = TopicAndPartition (topic, partition)     try  {         var  zookeeperPathUpdateSucceeded: Boolean  = false          var  newLeaderAndIsr: LeaderAndIsr  = null          var  replicasForThisPartition: Seq [Int ] = Seq .empty[Int ]         while  (!zookeeperPathUpdateSucceeded) {                          val  currentLeaderIsrAndEpoch = this .getLeaderIsrAndEpochOrThrowException(topic, partition)             val  currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr             val  controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch                          if  (controllerEpoch > controller.epoch) {                              }                          val  (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)                          val  (updateSucceeded, newVersion) = ReplicationUtils .updateLeaderAndIsr(                 zkUtils, topic, partition, leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion)             newLeaderAndIsr = leaderAndIsr             newLeaderAndIsr.zkVersion = newVersion             zookeeperPathUpdateSucceeded = updateSucceeded             replicasForThisPartition = replicas         }         val  newLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch (newLeaderAndIsr, controller.epoch)                  controllerContext.partitionLeadershipInfo.put(TopicAndPartition (topic, partition), newLeaderIsrAndControllerEpoch)                  val  replicas = controllerContext.partitionReplicaAssignment(TopicAndPartition (topic, partition))                  brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderIsrAndControllerEpoch, replicas)     } catch  {              }     debug("After leader election, leader cache is updated to %s" .format(controllerContext.partitionLeadershipInfo.map(l => (l._1, l._2)))) } 
在执行分区 leader 副本选举时会基于给定的分区 leader 副本选择器为对应 topic 分区选择新的 leader 副本,并返回新的 ISR 集合,因为对应分区的状态信息发生了变更,所以需要将更新后的分区状态更新到 ZK,并向集群中所有可用的 broker 节点发送 LeaderAndIsrRequest 请求,通知对应 broker 节点执行分区副本角色切换,并更新本地缓存的集群元数据信息。
PartitionLeaderSelector 特质抽象定义了分区 leader 副本选择器,定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 trait  PartitionLeaderSelector           def  selectLeader TopicAndPartition , currentLeaderAndIsr: LeaderAndIsr ): (LeaderAndIsr , Seq [Int ]) } 
Kafka 目前围绕 PartitionLeaderSelector 特质定义了 5 种分区 leader 副本选择策略:
NoOpLeaderSelector 
OfflinePartitionLeaderSelector 
ReassignedPartitionLeaderSelector 
PreferredReplicaPartitionLeaderSelector 
ControlledShutdownLeaderSelector 
 
其中 NoOpLeaderSelector 在实现上最简单,它实际上并没有做什么事情,只是将参数传递的目标分区当前 leader 副本和 ISR 集合作为结果直接返回,下面来具体分析一下剩余 4 种分区 leader 选择策略。
        
           
      OfflinePartitionLeaderSelector 分区 leader 副本选择器会尝试从 ISR 集合中选择新的 leader 副本,如果 ISR 集合中不存在可用的副本,则在配置允许的情况下尝试从 AR 集合中选择新的 leader 副本。策略实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 def  selectLeader TopicAndPartition , currentLeaderAndIsr: LeaderAndIsr ): (LeaderAndIsr , Seq [Int ]) = {    controllerContext.partitionReplicaAssignment.get(topicAndPartition) match  {                  case  Some (assignedReplicas) =>                          val  liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))                          val  liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))             val  currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch             val  currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion                          val  newLeaderAndIsr =                 if  (liveBrokersInIsr.isEmpty) {                                           if  (!LogConfig .fromProps(config.originals, AdminUtils .fetchEntityConfig(                         controllerContext.zkUtils, ConfigType .Topic , topicAndPartition.topic)).uncleanLeaderElectionEnable) {                                              }                     debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s" .format(topicAndPartition, liveAssignedReplicas.mkString("," )))                     if  (liveAssignedReplicas.isEmpty) {                                                                       } else  {                                                  ControllerStats .uncleanLeaderElectionRate.mark()                         val  newLeader = liveAssignedReplicas.head                         warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss." .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString("," )))                         new  LeaderAndIsr (newLeader, currentLeaderEpoch + 1 , List (newLeader), currentLeaderIsrZkPathVersion + 1 )                     }                 } else  {                      val  liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))                                          val  newLeader = liveReplicasInIsr.head                     debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader." .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString("," )))                                          new  LeaderAndIsr (newLeader, currentLeaderEpoch + 1 , liveBrokersInIsr, currentLeaderIsrZkPathVersion + 1 )                 }             info("Selected new leader and ISR %s for offline partition %s" .format(newLeaderAndIsr.toString(), topicAndPartition))                          (newLeaderAndIsr, liveAssignedReplicas)                  case  None  =>             throw  new  NoReplicaOnlineException ("Partition %s doesn't have replicas assigned to it" .format(topicAndPartition))     } } 
如果是从 ISR 集合中选择新的 leader 副本,则以 ISR 集合中所有可用的副本作为新的 ISR 集合。如果是从 AR 集合中选择新的 leader 副本,则需要配置 unclean.leader.election.enable=true,这种情况下 ISR 集合只包含 leader 副本。
        
           
      ReassignedPartitionLeaderSelector 分区 leader 副本选择器在副本重新分配的前提下会选择既位于新分配的 AR 集合中,同时又位于 ISR 集合中的副本作为新的 leader 副本,并以之前的 ISR 集合作为新的 ISR 集合。策略实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def  selectLeader TopicAndPartition , currentLeaderAndIsr: LeaderAndIsr ): (LeaderAndIsr , Seq [Int ]) = {         val  reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas     val  currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch     val  currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion          val  aliveReassignedInSyncReplicas = reassignedInSyncReplicas             .filter(r => controllerContext.liveBrokerIds.contains(r) && currentLeaderAndIsr.isr.contains(r))     val  newLeaderOpt = aliveReassignedInSyncReplicas.headOption     newLeaderOpt match  {                  case  Some (newLeader) =>             (new  LeaderAndIsr (newLeader, currentLeaderEpoch + 1 , currentLeaderAndIsr.isr, currentLeaderIsrZkPathVersion + 1 ), reassignedInSyncReplicas)                  case  None  =>             reassignedInSyncReplicas.size match  {                 case  0  =>                     throw  new  NoReplicaOnlineException ("List of reassigned replicas for partition %s is empty. Current leader and ISR: [%s]" .format(topicAndPartition, currentLeaderAndIsr))                 case  _ =>                     throw  new  NoReplicaOnlineException ("None of the reassigned replicas for partition %s are in-sync with the leader. Current leader and ISR: [%s]" .format(topicAndPartition, currentLeaderAndIsr))             }     } } 
由上述实现可以看出,如果不存在满足条件的副本则会抛出异常。
        
           
      PreferredReplicaPartitionLeaderSelector 分区 leader 副本选择器尝试选择优先副本(AR 集合中的第一个副本)作为 leader 副本,前提是该副本必须位于 ISR 集合中,并且以当前 ISR 集合作为新的 ISR 集合。策略实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def  selectLeader TopicAndPartition , currentLeaderAndIsr: LeaderAndIsr ): (LeaderAndIsr , Seq [Int ]) = {         val  assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)          val  preferredReplica = assignedReplicas.head     val  currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader     if  (currentLeader == preferredReplica) {                  throw  new  LeaderElectionNotNeededException ("Preferred replica %d is already the current leader for partition %s" .format(preferredReplica, topicAndPartition))     } else  {         info("Current leader %d for partition %s is not the preferred replica." .format(currentLeader, topicAndPartition) + " Triggering preferred replica leader election" )                  if  (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) {             (new  LeaderAndIsr (preferredReplica, currentLeaderAndIsr.leaderEpoch + 1 , currentLeaderAndIsr.isr, currentLeaderAndIsr.zkVersion + 1 ), assignedReplicas)         } else  {             throw  new  StateChangeFailedException ("Preferred replica %d for partition " .format(preferredReplica) + "%s is either not alive or not in the isr. Current leader and ISR: [%s]" .format(topicAndPartition, currentLeaderAndIsr))         }     } } 
由上述实现可以看出,如果优先副本所在 broker 节点不可用,或者优先副本不位于 ISR 集合中,则会抛出异常。
        
           
      ControlledShutdownLeaderSelector 分区 leader 副本选择器会尝试将副本所在 broker 节点正在关闭的副本从 ISR 集合中移除,并将 ISR 集合中剩下的副本作为新的 ISR 集合,同时从中选择一个副本作为新的 leader 副本。策略实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def  selectLeader TopicAndPartition , currentLeaderAndIsr: LeaderAndIsr ): (LeaderAndIsr , Seq [Int ]) = {    val  currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch     val  currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion     val  currentLeader = currentLeaderAndIsr.leader     val  assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)          val  liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds     val  liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))          val  newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))          liveAssignedReplicas.find(newIsr.contains) match  {         case  Some (newLeader) =>             debug("Partition %s : current leader = %d, new leader = %d" .format(topicAndPartition, currentLeader, newLeader))             (LeaderAndIsr (newLeader, currentLeaderEpoch + 1 , newIsr, currentLeaderIsrZkPathVersion + 1 ), liveAssignedReplicas)         case  None  =>             throw  new  StateChangeFailedException ("No other replicas in ISR %s for %s besides shutting down brokers %s" .format(currentLeaderAndIsr.isr.mkString("," ), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString("," )))     } } 
           
      ReplicaStateMachine 定义了 Kafka Controller 的副本状态机,用于管理集群中副本的状态信息,每个 Kafka Controller 都定义了自己的副本状态机,但是只有在当前 Controller 实例成为 leader 角色时才会启动运行名下的状态机。副本状态机使用 ReplicaState 特质定义副本的状态,同时提供了多个样例对象实现,分别表示不同的副本状态。副本状态样例对象说明:
NewReplica  :新创建出来的副本对应的状态,处于该状态的副本只能是 follower 副本。OnlineReplica  :当副本成为 AR 集合中的一员即位于该状态,此时副本既可以是 leader 角色,也可以是 follower 角色。OfflineReplica  :当副本所在的 broker 节点宕机后,副本所对应的状态。ReplicaDeletionStarted  :当开始删除副本时,会先将副本切换成该状态,然后开始执行删除操作。ReplicaDeletionSuccessful  :当副本被成功删除后对应的状态。ReplicaDeletionIneligible  :当副本删除失败后对应的状态。NonExistentReplica  :一个被成功删除的副本,最终将切换成该状态。 
        
ReplicaStateMachine 的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class  ReplicaStateMachine (controller: KafkaController  ) extends  Logging           private  val  controllerContext = controller.controllerContext          private  val  controllerId = controller.config.brokerId          private  val  zkUtils = controllerContext.zkUtils          private  val  replicaState: mutable.Map [PartitionAndReplica , ReplicaState ] = mutable.Map .empty          private  val  brokerChangeListener = new  BrokerChangeListener (controller)          private  val  brokerRequestBatch = new  ControllerBrokerRequestBatch (controller)          private  val  hasStarted = new  AtomicBoolean (false )      } 
当 Kafka Controller 实例从 follower 角色选举成为 leader 角色时,会调用 ReplicaStateMachine#startup 方法启动对应的副本状态机,该方法实现如下:
1 2 3 4 5 6 7 8 def  startup          this .initializeReplicaState()          hasStarted.set(true )          this .handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica ) } 
副本状态机使用 ReplicaStateMachine#replicaState 字段记录集群中所有副本的状态,在启动时会初始化该字段,即初始化每个副本的状态信息,尝试将所有可用的副本状态切换成 OnlineReplica 状态,而将所有不可用的副本切换成 ReplicaDeletionIneligible 状态。
方法 ReplicaStateMachine#initializeReplicaState 会遍历处理每个 topic 分区的 AR 集合,并依据副本所在 broker 节点是否可用对本地记录的副本状态执行初始化操作,方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private  def  initializeReplicaState     for  ((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {         val  topic = topicPartition.topic         val  partition = topicPartition.partition                  assignedReplicas.foreach { replicaId =>             val  partitionAndReplica = PartitionAndReplica (topic, partition, replicaId)                          if  (controllerContext.liveBrokerIds.contains(replicaId)) replicaState.put(partitionAndReplica, OnlineReplica )                          else  replicaState.put(partitionAndReplica, ReplicaDeletionIneligible )         }     } } 
方法 ReplicaStateMachine#handleStateChanges 会遍历所有可用的副本,并尝试将对应副本的状态切换成 OnlineReplica 状态,方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def  handleStateChanges Set [PartitionAndReplica ],                       targetState: ReplicaState ,                        callbacks: Callbacks  = (new  CallbackBuilder ).build) {     if  (replicas.nonEmpty) {         info("Invoking state change to %s for replicas %s" .format(targetState, replicas.mkString("," )))         try  {                          brokerRequestBatch.newBatch()                          replicas.foreach(r => this .handleStateChange(r, targetState, callbacks))                          brokerRequestBatch.sendRequestsToBrokers(controller.epoch)         } catch  {             case  e: Throwable  => error("Error while moving some replicas to %s state" .format(targetState), e)         }     } } 
具体的状态切换交由 ReplicaStateMachine#handleStateChange 方法实现,这也是 ReplicaStateMachine 中定义的最核心的方法,实现如下(省略了部分日志打点):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 def  handleStateChange PartitionAndReplica , targetState: ReplicaState , callbacks: Callbacks ) {    val  topic = partitionAndReplica.topic     val  partition = partitionAndReplica.partition     val  replicaId = partitionAndReplica.replica     val  topicAndPartition = TopicAndPartition (topic, partition)          if  (!hasStarted.get)         throw  new  StateChangeFailedException (             "Controller %d epoch %d initiated state change of replica %d for partition %s to %s failed because replica state machine has not started"                      .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState))          val  currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica )     try  {                  val  replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)                  targetState match  {             case  NewReplica  =>                                  this .assertValidPreviousStates(partitionAndReplica, List (NonExistentReplica ), targetState)                                  val  leaderIsrAndControllerEpochOpt = ReplicationUtils .getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)                 leaderIsrAndControllerEpochOpt match  {                     case  Some (leaderIsrAndControllerEpoch) =>                                                  if  (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)                             throw  new  StateChangeFailedException ("Replica %d for partition %s cannot be moved to NewReplica"                                      .format(replicaId, topicAndPartition) + "state as it is being requested to become leader" )                                                  brokerRequestBatch.addLeaderAndIsrRequestForBrokers(                             List (replicaId), topic, partition, leaderIsrAndControllerEpoch, replicaAssignment)                     case  None  =>                  }                                  replicaState.put(partitionAndReplica, NewReplica )             case  ReplicaDeletionStarted  =>                                  this .assertValidPreviousStates(partitionAndReplica, List (OfflineReplica ), targetState)                                  replicaState.put(partitionAndReplica, ReplicaDeletionStarted )                                  brokerRequestBatch.addStopReplicaRequestForBrokers(                     List (replicaId), topic, partition, deletePartition = true , callbacks.stopReplicaResponseCallback)             case  ReplicaDeletionIneligible  =>                                  this .assertValidPreviousStates(partitionAndReplica, List (ReplicaDeletionStarted ), targetState)                                  replicaState.put(partitionAndReplica, ReplicaDeletionIneligible )             case  ReplicaDeletionSuccessful  =>                                  this .assertValidPreviousStates(partitionAndReplica, List (ReplicaDeletionStarted ), targetState)                                  replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful )             case  NonExistentReplica  =>                                  this .assertValidPreviousStates(partitionAndReplica, List (ReplicaDeletionSuccessful ), targetState)                                  val  currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)                 controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))                                  replicaState.remove(partitionAndReplica)             case  OnlineReplica  =>                                  this .assertValidPreviousStates(partitionAndReplica,                     List (NewReplica , OnlineReplica , OfflineReplica , ReplicaDeletionIneligible ), targetState)                 replicaState(partitionAndReplica) match  {                     case  NewReplica  =>                                                  val  currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)                         if  (!currentAssignedReplicas.contains(replicaId))                             controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)                     case  _ =>                                                  controllerContext.partitionLeadershipInfo.get(topicAndPartition) match  {                                                          case  Some (leaderIsrAndControllerEpoch) =>                                 brokerRequestBatch.addLeaderAndIsrRequestForBrokers(                                     List (replicaId), topic, partition, leaderIsrAndControllerEpoch, replicaAssignment)                                 replicaState.put(partitionAndReplica, OnlineReplica )                                                          case  None  =>                                                       }                 }                                  replicaState.put(partitionAndReplica, OnlineReplica )             case  OfflineReplica  =>                                  this .assertValidPreviousStates(partitionAndReplica,                     List (NewReplica , OnlineReplica , OfflineReplica , ReplicaDeletionIneligible ), targetState)                                  brokerRequestBatch.addStopReplicaRequestForBrokers(List (replicaId), topic, partition, deletePartition = false )                                  val  leaderAndIsrIsEmpty: Boolean  =                     controllerContext.partitionLeadershipInfo.get(topicAndPartition) match  {                         case  Some (_) =>                                                          controller.removeReplicaFromIsr(topic, partition, replicaId) match  {                                 case  Some (updatedLeaderIsrAndControllerEpoch) =>                                                                          val  currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)                                     if  (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) {                                                                                  brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),                                             topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)                                     }                                                                          replicaState.put(partitionAndReplica, OfflineReplica )                                     false                                  case  None  =>                                     true                              }                         case  None  =>                             true                      }                 if  (leaderAndIsrIsEmpty && !controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition))                     throw  new  StateChangeFailedException (                         "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty" .format(replicaId, topicAndPartition))         }     }     catch  {              } } 
同前面分析过的分区状态切换 PartitionStateMachine#handleStateChange 方法类似,副本状态的整体实现思路同样是依据切换的目标状态对当前副本状态执行校验,保证当前副本状态属于合法的目标切换状态的前置状态。方法 ReplicaStateMachine#assertValidPreviousStates 实现了前置状态的校验,如果前置状态不合法则会抛出异常。具体的副本状态切换逻辑如上述方法中的代码注释,思想上类似前面介绍的分区状态切换的实现,这里不再展开分析。
        
           
      TopicDeletionManager 负责对管理员指定的 topic 执行删除操作,它定义了 DeleteTopicsThread 线程,采用异步的方式删除待删除的 topic 集合。TopicDeletionManager 的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class  TopicDeletionManager (controller: KafkaController ,                            initialTopicsToBeDeleted: Set [String ] = Set .empty,                            initialTopicsIneligibleForDeletion: Set [String ] = Set .empty ) extends  Logging           val  controllerContext: ControllerContext  = controller.controllerContext          val  partitionStateMachine: PartitionStateMachine  = controller.partitionStateMachine          val  replicaStateMachine: ReplicaStateMachine  = controller.replicaStateMachine          val  deleteTopicsCond: Condition  = deleteLock.newCondition()          val  deleteTopicStateChanged: AtomicBoolean  = new  AtomicBoolean (false )          var  deleteTopicsThread: DeleteTopicsThread  = _          val  isDeleteTopicEnabled: lang.Boolean  = controller.config.deleteTopicEnable          val  topicsToBeDeleted: mutable.Set [String ] = if  (isDeleteTopicEnabled) {         mutable.Set .empty[String ] ++ initialTopicsToBeDeleted     } else  {                  val  zkUtils = controllerContext.zkUtils         for  (topic <- initialTopicsToBeDeleted) {             val  deleteTopicPath = getDeleteTopicPath(topic)             info("Removing "  + deleteTopicPath + " since delete topic is disabled" )             zkUtils.zkClient.delete(deleteTopicPath)         }         mutable.Set .empty[String ]     }          val  topicsIneligibleForDeletion: mutable.Set [String ] = mutable.Set .empty[String ] ++ (initialTopicsIneligibleForDeletion & topicsToBeDeleted)          val  partitionsToBeDeleted: mutable.Set [TopicAndPartition ] = topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic)      } 
Kafka 提供了 delete.topic.enable 配置项,用于配置是否启用 topic 删除机制,如果未开启则不会真正执行删除操作,而是将指定的待删除 topic 信息从 /admin/delete_topics 节点下移除。另外,由上面的字段定义可以看到 TopicDeletionManager 还管理了不可删除的 topic 集合,当一个 topic 满足以下条件之一时,我们认为暂时不能对其执行删除操作:
Topic 的某个分区正在执行副本重新分配。 
Topic 的某个分区正在执行优先副本选举。 
Topic 的某个副本不可用,即所在的 broker 节点宕机。 
 
当 Kafka Controller 实例成为 leader 角色时会调用 TopicDeletionManager#start 方法启动 topic 删除机制,该方法主要用于启动后台删除线程 DeleteTopicsThread:
1 2 3 4 5 6 7 8 9 def  start     if  (isDeleteTopicEnabled) {         deleteTopicsThread = new  DeleteTopicsThread ()                  if  (topicsToBeDeleted.nonEmpty) deleteTopicStateChanged.set(true )                  deleteTopicsThread.start()     } } 
DeleteTopicsThread 继承自 ShutdownableThread 抽象类,其 DeleteTopicsThread#doWork 方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 override  def  doWork          awaitTopicDeletionNotification()     if  (!isRunning.get) return      inLock(controllerContext.controllerLock) {                  val  topicsQueuedForDeletion = Set .empty[String ] ++ topicsToBeDeleted         topicsQueuedForDeletion.foreach { topic =>                          if  (controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {                                  completeDeleteTopic(topic)                 info("Deletion of topic %s successfully completed" .format(topic))             }                          else  {                                  if  (controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {                                      }                                  else  {                                          if  (controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible )) {                         markTopicForDeletionRetry(topic)                     }                 }             }                          if  (isTopicEligibleForDeletion(topic)) {                 info("Deletion of topic %s (re)started" .format(topic))                                  onTopicDeletion(Set (topic))             } else  if  (isTopicIneligibleForDeletion(topic)) {                 info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion" .format(topic))             }         }     } } 
Topic 删除的执行流程可以概括为:
获取待删除的 topic 集合; 
如果 topic 的所有副本都已经成功被删除,则变更 topic 及其分区和副本的状态,并从 ZK 和 Controller 上下文中移除 topic 相关信息; 
否则,如果 topic 存在任一副本处于删除准备的状态(ReplicaDeletionStarted),则跳过当前 topic 继续处理其它 topic; 
否则,如果 topic 存在任一副本处于删除失败的状态(ReplicaDeletionIneligible),则尝试将对应副本状态重置为 OfflineReplica,等待后续删除重试; 
检测当前 topic 是否可以被删除,如果可以则开始执行删除操作。 
 
下面来重点分析一下步骤 2 和 5。 步骤 2  用于对已经成功被删除的 topic 执行一些后置清理工作,包括注销 ZK 监听器、切换分区和副本的状态,以及从 ZK 和 Controller 上下文中清除 topic 相关的数据等,具体实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private  def  completeDeleteTopic String ) {         partitionStateMachine.deregisterPartitionChangeListener(topic)          val  replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful )     replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica )          val  partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)     partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition )     partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition )          topicsToBeDeleted -= topic     partitionsToBeDeleted.retain(_.topic != topic)          val  zkUtils = controllerContext.zkUtils     zkUtils.zkClient.deleteRecursive(getTopicPath(topic))     zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType .Topic , topic))     zkUtils.zkClient.delete(getDeleteTopicPath(topic))          controllerContext.removeTopic(topic) } 
步骤 5  负责向所有可用的 broker 节点发送 UpdateMetadataRequest 请求,通知这些节点相关 topic 需要被删除,并对 topic 名下分区的 AR 集合执行删除操作。一个 topic 可以执行删除需要满足以下 2 个条件:
Topic 待删除,且还未开始进行删除操作。 
Topic 未标记为不可删除。 
 
对于同时满足上述条件的 topic 会调用 TopicDeletionManager#onTopicDeletion 方法执行删除操作:
1 2 3 4 5 6 7 8 9 10 11 12 private  def  onTopicDeletion Set [String ]) {    info("Topic deletion callback for %s" .format(topics.mkString("," )))     val  partitions = topics.flatMap(controllerContext.partitionsForTopic)          controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)          val  partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)          topics.foreach { topic =>         this .onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).keySet)     } } 
方法 TopicDeletionManager#onPartitionDeletion 只是简单获取了对应 topic 分区的 AR 集合,并调用 TopicDeletionManager#startReplicaDeletion 方法对这些副本执行删除操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private  def  startReplicaDeletion Set [PartitionAndReplica ]) {    replicasForTopicsToBeDeleted.groupBy(_.topic).keys.foreach { topic =>                  val  aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic == topic)                  val  deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic                  val  successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful )                  val  replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas                  replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible )                  replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica )         debug("Deletion started for replicas %s" .format(replicasForDeletionRetry.mkString("," )))                  controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted ,             new  Callbacks .CallbackBuilder ().stopReplicaCallback(deleteTopicStopReplicaCallback).build)                  if  (deadReplicasForTopic.nonEmpty) {             debug("Dead Replicas (%s) found for topic %s" .format(deadReplicasForTopic.mkString("," ), topic))             markTopicIneligibleForDeletion(Set (topic))         }     } } private  def  deleteTopicStopReplicaCallback AbstractResponse , replicaId: Int ) {    val  stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse ]     debug("Delete topic callback invoked for %s" .format(stopReplicaResponse))     val  responseMap = stopReplicaResponse.responses.asScala          val  partitionsInError =         if  (stopReplicaResponse.errorCode != Errors .NONE .code) responseMap.keySet         else  responseMap.filter { case  (_, error) => error != Errors .NONE .code }.keySet          val  replicasInError = partitionsInError.map(p => PartitionAndReplica (p.topic, p.partition, replicaId))     inLock(controllerContext.controllerLock) {                  this .failReplicaDeletion(replicasInError)                  if  (replicasInError.size != responseMap.size) {             val  deletedReplicas = responseMap.keySet -- partitionsInError              this .completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica (p.topic, p.partition, replicaId)))         }     } } 
对于删除失败的副本会将其状态切换成 ReplicaDeletionIneligible,并唤醒删除线程再次尝试删除;对于删除成功的副本则将其状态置为 ReplicaDeletionSuccessful。如果一个待删除 topic 所有的副本状态均为 ReplicaDeletionSuccessful,则 DeleteTopicsThread 线程会对该 topic 执行后置清理工作,即我们前面分析的步骤 2。
        
           
      Kafka Controller 提供了分区副本再分配机制,用于为指定的 topic 分区重新分配副本。当一个 Kafka Controller 实例竞选成为 leader 角色,或者管理员手动指定需要为某些 topic 分区重新分配副本时会触发该机制。这里我们以 Kafka Controller 实例竞选成为 leader 角色触发分区副本再分配的场景为例进行说明,关于管理员手动触发的场景留到后面分析 ZK 监听机制时再进行分析,实际上二者只是入口不同,具体的执行流程还是一样的。
我们从 KafkaController#maybeTriggerPartitionReassignment 方法开始说起,当 Kafka Controller 实例竞选成为 leader 角色时会触发执行该方法。在 Controller 的上下文中定义了 ControllerContext#partitionsBeingReassigned 字段,用于记录需要和正在执行副本再分配操作的 topic 分区。而 KafkaController#maybeTriggerPartitionReassignment 方法只是简单了遍历了该字段,并调用 KafkaController#initiateReassignReplicasForTopicPartition 方法为每个需要执行副本再分配的 topic 分区执行再分配操作。
在开始分析相关实现之前,我们需要明确 2 个概念:RAR 和 OAR,其中 RAR 表示分区新分配的 AR 集合,OAR 表示分区之前的 AR 集合。下面开始分析方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 def  initiateReassignReplicasForTopicPartition TopicAndPartition , reassignedPartitionContext: ReassignedPartitionsContext ) {         val  newReplicas = reassignedPartitionContext.newReplicas     val  topic = topicAndPartition.topic     val  partition = topicAndPartition.partition     try  {                  val  assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)         assignedReplicasOpt match  {             case  Some (assignedReplicas) =>                                  if  (assignedReplicas == newReplicas) {                     throw  new  KafkaException ("Partition %s to be reassigned is already assigned to replicas" .format(topicAndPartition) + " %s. Ignoring request for partition reassignment" .format(newReplicas.mkString("," )))                 } else  {                     info("Handling reassignment of partition %s to new replicas %s" .format(topicAndPartition, newReplicas.mkString("," )))                                          this .watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)                     controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)                                          deleteTopicManager.markTopicIneligibleForDeletion(Set (topic))                                          this .onPartitionReassignment(topicAndPartition, reassignedPartitionContext)                 }             case  None  => throw  new  KafkaException ("Attempt to reassign partition %s that doesn't exist" .format(topicAndPartition))         }     } catch  {              } } 
如果新分配的副本集合较当前的 AR 集合有变更,则会触发执行再分配操作,在开始操作之前,需要为对应 topic 分区注册一个 ReassignedPartitionsIsrChangeListener 监听器,并标记分区所属的 topic 不可被删除。ReassignedPartitionsIsrChangeListener 用于监听当前分区 ISR 集合的变化,具体实现我们留到后面的小节中针对性分析,下面重点来看一下 KafkaController#onPartitionReassignment 方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 def  onPartitionReassignment TopicAndPartition , reassignedPartitionContext: ReassignedPartitionsContext ) {         val  reassignedReplicas = reassignedPartitionContext.newReplicas          if  (!this .areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) {         info("New replicas %s for partition %s being " .format(reassignedReplicas.mkString("," ), topicAndPartition) + "reassigned not yet caught up with the leader" )                  val  newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet                  val  newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet                  this .updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)                  this .updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition), newAndOldReplicas.toSeq)                  this .startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)         info("Waiting for new replicas %s for partition %s being " .format(reassignedReplicas.mkString("," ), topicAndPartition) + "reassigned to catch up with the leader" )     }          else  {                  val  oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet                  reassignedReplicas.foreach { replica =>             replicaStateMachine.handleStateChanges(Set (PartitionAndReplica (topicAndPartition.topic, topicAndPartition.partition, replica)), OnlineReplica )         }                  this .moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)                  this .stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)                  this .updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)                  this .removePartitionFromReassignedPartitions(topicAndPartition)         info("Removed partition %s from the list of reassigned partitions in zookeeper" .format(topicAndPartition))         controllerContext.partitionsBeingReassigned.remove(topicAndPartition)                  this .sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set (topicAndPartition))                  deleteTopicManager.resumeDeletionForTopics(Set (topicAndPartition.topic))     } } 
如果 RAR 中存在一个或多个副本不在 ISR 集合中,则再分配的执行流程如下:
以 OAR + RAR 中全部副本集合作为对应 topic 分区的 AR 集合,更新到 ZK 和 Controller 上下文中; 
向 OAR + RAR 中全部副本所在的 broker 节点发送 LeaderAndIsrRequest 请求,更新对应节点缓存的分区 leader 副本的年代信息; 
切换新增的副本(RAR - OAR)状态为 NewReplica。 
 
这一场景下再分配操作并未执行完成,实际上大部分再分配操作新分配的 RAR 集合中都包含一个或多个不存在于 ISR 集合中的副本,所以上面的执行流程可以看做是副本再分配操作的前置流程。当这些新增的副本在运行一段时间之后,与 leader 副本进行同步,并逐一加入到 ISR 集合之后会触发 ReassignedPartitionsIsrChangeListener 监听器,回调执行后续的流程(即 RAR 中的副本全部存在于 ISR 集合中)。
如果 RAR 中的副本均包含在对应分区的 ISR 集合中,则再分配的执行流程如下:
切换 RAR 中所有副本的状态为 OnlineReplica; 
使用 RAR 集合更新对应 topic 分区的 AR 集合,并在 leader 副本不在 RAR 集合中或所在的 broker 节点失效的情况下,基于 ReassignedPartitionLeaderSelector 分区 leader 副本选择器重新选择新的 leader 副本; 
切换旧的副本(OAR - RAR)状态为 NonExistentReplica; 
更新 ZK 中记录的对应分区的 AR 集合; 
从 ZK 和 Controller 上下文中移除对应 topic 分区的副本再分配信息; 
向所有可用的 broker 节点发送 UpdateMetadataRequest 请求,更新副本再分配后的集群状态信息; 
取消对应 topic 的不可删除标记(前面有标记为不可删除),并唤醒 DeleteTopicsThread 线程。 
 
各步骤的方法实现都比较简单,这里不再继续深入。
        
           
      Kafka 与 ZK 的交互依赖于 zkclient 
        
           
      SessionExpirationListener 实现了 IZkStateListener 接口,用于监听 Kafka Controller 与 ZK 之间的连接状态。SessionExpirationListener 提供了 SessionExpirationListener#handleNewSession 方法实现,当与 ZK 建立新的连接会话时会触发回调该方法,尝试选举新的 leader 角色。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def  handleNewSession     info("ZK expired; shut down all controller components and try to re-elect" )          if  (controllerElector.getControllerID != config.brokerId) {                  onControllerResignation()         inLock(controllerContext.controllerLock) {                          controllerElector.elect         }     } else  {         info("ZK expired, but the current controller id %d is the same as this broker id, skip re-elect" .format(config.brokerId))     } } 
当 broker 与 ZK 建立新的会话时,上述方法会检查当前 ZK 上记录的 leader 节点是否是当前实例所在的节点,如果不是的话则需要调用 KafkaController#onControllerResignation 执行一些状态清理工作(因为当前节点之前可能是 leader 角色),然后调用 ZookeeperLeaderElector#elect 方法基于 ZK 的临时节点机制尝试竞选成为新的 leader。关于 ZookeeperLeaderElector#elect 方法,我们在后面会专门分析,这里先来看一下  KafkaController#onControllerResignation 方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 def  onControllerResignation     debug("Controller resigning, broker id %d" .format(config.brokerId))          this .deregisterIsrChangeNotificationListener()     this .deregisterReassignedPartitionsListener()     this .deregisterPreferredReplicaElectionListener()          if  (deleteTopicManager != null ) deleteTopicManager.shutdown()          if  (config.autoLeaderRebalanceEnable) autoRebalanceScheduler.shutdown()     inLock(controllerContext.controllerLock) {                  this .deregisterReassignedPartitionsIsrChangeListeners()                  partitionStateMachine.shutdown()                  replicaStateMachine.shutdown()                  if  (controllerContext.controllerChannelManager != null ) {             controllerContext.controllerChannelManager.shutdown()             controllerContext.controllerChannelManager = null          }                  controllerContext.epoch = 0          controllerContext.epochZkVersion = 0                   brokerState.newState(RunningAsBroker )         info("Broker %d resigned as the controller" .format(config.brokerId))     } } 
上述方法会在 Kafka Controller 由 leader 角色降级为 follower 角色时被触发,具体的执行逻辑如代码注释。
        
           
      
        
           
      TopicChangeListener 实现了 IZkChildListener 接口,用于监听 /brokers/topics 节点,当有新的 topic 创建或者删除已有 topic 时,会触发执行相应的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def  doHandleChildChange String , children: Seq [String ]) {    inLock(controllerContext.controllerLock) {         if  (hasStarted.get) {             try  {                                  val  currentChildren = {                     debug("Topic change listener fired for path %s with children %s" .format(parentPath, children.mkString("," )))                     children.toSet                 }                                  val  newTopics = currentChildren -- controllerContext.allTopics                                  val  deletedTopics = controllerContext.allTopics -- currentChildren                                  controllerContext.allTopics = currentChildren                                  val  addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)                                  controllerContext.partitionReplicaAssignment =                         controllerContext.partitionReplicaAssignment.filter(p => !deletedTopics.contains(p._1.topic))                 controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment                 info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]" .format(newTopics, deletedTopics, addedPartitionReplicaAssignment))                                  if  (newTopics.nonEmpty)                     controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)             } catch  {                 case  e: Throwable  => error("Error while handling new topic" , e)             }         }     } } 
上述方法基于 ZK 感知当前新增和已删除的 topic 集合,并更新本地记录的可用的 topic 集合,及其分区的 AR 集合信息。对于新增的 topic 集合,Kafka Controller 会调用 KafkaController#onNewTopicCreation 方法为每个 topic 注册一个 PartitionModificationsListener 监听器,同时切换对应 topic 新增分区及其副本的状态,使其能够上线运行。相关实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def  onNewTopicCreation Set [String ], newPartitions: Set [TopicAndPartition ]) {    info("New topic creation callback for %s" .format(newPartitions.mkString("," )))          topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))          this .onNewPartitionCreation(newPartitions) } def  onNewPartitionCreation Set [TopicAndPartition ]) {    info("New partition creation callback for %s" .format(newPartitions.mkString("," )))          partitionStateMachine.handleStateChanges(newPartitions, NewPartition )          replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica )          partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition , offlinePartitionSelector)          replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica ) } 
由上面的实现可以看出,分区和副本的状态并非是一次性切换成 Online 状态的,而是先切换成 New 状态,再切换成 Online 状态,期间会为分区分配 leader 副本和 ISR 集合,并尝试将新的副本添加到对应分区的 AR 集合中。
        
           
      DeleteTopicsListener 实现了 IZkChildListener 接口,用于监听 /admin/delete_topics 节点,当管理员指定要删除一些 topic 时,对应的 topic 会被写入到该 ZK 节点下,然后触发执行 DeleteTopicsListener 的回调方法 DeleteTopicsListener#doHandleChildChange,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 def  doHandleChildChange String , children: Seq [String ]) {    inLock(controllerContext.controllerLock) {                  var  topicsToBeDeleted = children.toSet         debug("Delete topics listener fired for topics %s to be deleted" .format(topicsToBeDeleted.mkString("," )))                  val  nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics         if  (nonExistentTopics.nonEmpty) {             warn("Ignoring request to delete non-existing topics "  + nonExistentTopics.mkString("," ))             nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))         }         topicsToBeDeleted --= nonExistentTopics                  if  (controller.config.deleteTopicEnable) {             if  (topicsToBeDeleted.nonEmpty) {                 info("Starting topic deletion for topics "  + topicsToBeDeleted.mkString("," ))                                  topicsToBeDeleted.foreach { topic =>                                          val  preferredReplicaElectionInProgress =                         controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)                                          val  partitionReassignmentInProgress =                         controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)                                          if  (preferredReplicaElectionInProgress || partitionReassignmentInProgress)                         controller.deleteTopicManager.markTopicIneligibleForDeletion(Set (topic))                 }                                  controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)             }         } else  {                          for  (topic <- topicsToBeDeleted) {                 info("Removing "  + getDeleteTopicPath(topic) + " since delete topic is disabled" )                 zkUtils.zkClient.delete(getDeleteTopicPath(topic))             }         }     } } 
当检测到有新的 topic 需要被删除时,上述方法会获取需要被删除的 topic 集合,并判定对应的 topic 是否是有效的(是否是真实存在的),如果无效则直接将相关删除信息从 ZK 节点下移除,对于有效的 topic 集合,在配置(对应 delete.topic.enable 配置)允许的情况下会检查待删除的 topic 分区是否满足以下 2 个条件:
存在正在进行优先副本选举的分区。 
存在正在进行副本重新分配的分区。 
 
如果待删除分区满足上述 2 个条件之一则将其标记为不可删除,否则将对应的 topic 提交给 TopicDeletionManager 执行删除操作。TopicDeletionManager 会将待删除的 topic 及其分区集合添加到 TopicDeletionManager 定义的待删除集合中,并唤醒 DeleteTopicsThread 线程执行删除操作。关于 TopicDeletionManager 的运行机制可以参考前面小节的分析。
        
           
      BrokerChangeListener 实现了IZkChildListener 接口,用于监听 /broker/ids 节点,当有 broker 节点上线或者下线时,会触发执行相应的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 def  doHandleChildChange String , currentBrokerList: Seq [String ]) {    info("Broker change listener fired for path %s with children %s" .format(parentPath, currentBrokerList.sorted.mkString("," )))     inLock(controllerContext.controllerLock) {         if  (hasStarted.get) {             ControllerStats .leaderElectionTimer.time {                 try  {                                          val  curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)                     val  curBrokerIds = curBrokers.map(_.id)                     val  liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds                                          val  newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds                                          val  deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds                     val  newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))                                          controllerContext.liveBrokers = curBrokers                     val  newBrokerIdsSorted = newBrokerIds.toSeq.sorted                     val  deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted                     val  liveBrokerIdsSorted = curBrokerIds.toSeq.sorted                     info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"                              .format(newBrokerIdsSorted.mkString("," ), deadBrokerIdsSorted.mkString("," ), liveBrokerIdsSorted.mkString("," )))                                          newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)                                          deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)                                          if  (newBrokerIds.nonEmpty) controller.onBrokerStartup(newBrokerIdsSorted)                                          if  (deadBrokerIds.nonEmpty) controller.onBrokerFailure(deadBrokerIdsSorted)                 } catch  {                     case  e: Throwable  => error("Error while handling broker changes" , e)                 }             }         }     } } 
对于新上线的 broker 节点会触发 Kafka Controller 创建到这些节点的网络连接,并通知集群中所有可用的 broker 节点有新的 broker 节点上线,同时切换新增 broker 节点上的分区副本状态,以上线对外提供服务。相关实现位于 KafkaController#onBrokerStartup 方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def  onBrokerStartup Seq [Int ]) {    info("New broker startup callback for %s" .format(newBrokers.mkString("," )))     val  newBrokersSet = newBrokers.toSet          this .sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)          val  allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)     replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica )          partitionStateMachine.triggerOnlinePartitionStateChange()          val  partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {         case  (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains)     }     partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))          val  replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))     if  (replicasForTopicsToBeDeleted.nonEmpty) {         info("Some replicas %s for topics scheduled for deletion %s are on the newly restarted brokers %s. Signaling restart of topic deletion for these topics"                  .format(replicasForTopicsToBeDeleted.mkString("," ), deleteTopicManager.topicsToBeDeleted.mkString("," ), newBrokers.mkString("," )))         deleteTopicManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))     } } 
对于已经下线的 broker 节点会触发 Kafka Controller 关闭到这些节点的网络连接,并将分配给故障节点的副本置为 OfflineReplica 状态。如果某些分区的 leader 副本正好位于故障 broker 节点上,则需要将这些分区置为 OfflinePartition 状态,并通知到集群中所有可用的 broker 节点。相关实现位于 KafkaController#onBrokerFailure 方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 def  onBrokerFailure Seq [Int ]) {    info("Broker failure callback for %s" .format(deadBrokers.mkString("," )))          val  deadBrokersThatWereShuttingDown = deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))     info("Removed %s from list of shutting down brokers." .format(deadBrokersThatWereShuttingDown))     val  deadBrokersSet = deadBrokers.toSet          val  partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>         deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) &&                 !deleteTopicManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet     partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition )          partitionStateMachine.triggerOnlinePartitionStateChange()          val  allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet)     val  activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))     replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica )          val  replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))     if  (replicasForTopicsToBeDeleted.nonEmpty) {         deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted)     }          if  (partitionsWithoutLeader.isEmpty) {         this .sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)     } } 
           
      IsrChangeNotificationListener 实现了 IZkChildListener 接口,用于监听 /isr_change_notification 节点,当监听到某些分区的 ISR 集合发生变化时,会触发执行相应的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def  doHandleChildChange String , currentChildren: Seq [String ]): Unit  = {    inLock(controller.controllerContext.controllerLock) {         debug("ISR change notification listener fired" )         try  {                          val  topicAndPartitions = currentChildren.flatMap(getTopicAndPartition).toSet             if  (topicAndPartitions.nonEmpty) {                                  controller.updateLeaderAndIsrCache(topicAndPartitions)                                  processUpdateNotifications(topicAndPartitions)             }         } finally  {                          currentChildren.map(x => controller.controllerContext.zkUtils.deletePath(ZkUtils .IsrChangeNotificationPath  + "/"  + x))         }     } } private  def  processUpdateNotifications Set [TopicAndPartition ]) {    val  liveBrokers: Seq [Int ] = controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq     debug("Sending MetadataRequest to Brokers:"  + liveBrokers + " for TopicAndPartitions:"  + topicAndPartitions)     controller.sendUpdateMetadataRequest(liveBrokers, topicAndPartitions) } 
相关逻辑如代码注释,比较简单。
        
           
      
        
           
      LeaderChangeListener 实现了 IZkDataListener 接口,用于监听 /controller 节点,当节点数据发生变更或被删除时,会触发执行相应的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 def  handleDataChange String , data: Object ) {    val  shouldResign = inLock(controllerContext.controllerLock) {         val  amILeaderBeforeDataChange = amILeader                  leaderId = KafkaController .parseControllerId(data.toString)         info("New leader is %d" .format(leaderId))                  amILeaderBeforeDataChange && !amILeader     }          if  (shouldResign) onResigningAsLeader() } def  handleDataDeleted String ) {    val  shouldResign = inLock(controllerContext.controllerLock) {         debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader" .format(brokerId, dataPath))         amILeader     }          if  (shouldResign) onResigningAsLeader()          inLock(controllerContext.controllerLock) {         elect     } } 
具体的回调逻辑如代码注释,其中 ZookeeperLeaderElector#elect 方法的实现将留到后面的小节中进行分析,另外回调方法 ZookeeperLeaderElector#onResigningAsLeader 实际上就是 KafkaController#onControllerResignation 方法,这个在前面已经分析过,不再重复撰述。
        
           
      PartitionModificationsListener 实现了 IZkDataListener 接口,用于监听 /brokers/topics/{topic_name} 节点,当某个 topic 的分区发生变化时(即增加分区,因为分区数目只增不减),会触发执行相应的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 def  doHandleDataChange String , data: AnyRef ) {    inLock(controllerContext.controllerLock) {         try  {             info(s"Partition modification triggered $data  for path $dataPath " )                          val  partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List (topic))                          val  partitionsToBeAdded = partitionReplicaAssignment                     .filter(p => !controllerContext.partitionReplicaAssignment.contains(p._1))                          if  (controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))                 error("Skipping adding partitions %s for topic %s since it is currently being deleted" .format(partitionsToBeAdded.map(_._1.partition).mkString("," ), topic))             else  {                                  if  (partitionsToBeAdded.nonEmpty) {                     info("New partitions to be added %s" .format(partitionsToBeAdded))                                          controllerContext.partitionReplicaAssignment ++= partitionsToBeAdded                                          controller.onNewPartitionCreation(partitionsToBeAdded.keySet)                 }             }         } catch  {             case  e: Throwable  => error("Error while handling add partitions for data path "  + dataPath, e)         }     } } 
上述回调方法针对正常运行的 topic,如果有新增分区则会切换这些新增分区和副本的状态,使这些分区和副本能够上线对外提供服务。其中 KafkaController#onNewTopicCreation 方法已在前面分析过,这里不再重复撰述。
        
           
      PreferredReplicaElectionListener 实现了 IZkDataListener 接口,用于监听 /admin/preferred_replica_election 节点,为指定的 topic 分区选举优先副本作为 leader 副本,以保证集群中 leader 副本的均衡分布。相关回调方法实现如下(省略了日志打点):
1 2 3 4 5 6 7 8 9 10 11 12 def  doHandleDataChange String , data: AnyRef ) {    inLock(controllerContext.controllerLock) {                  val  partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand .parsePreferredReplicaElectionData(data.toString)                  val  partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection                  val  partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))                  controller.onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)     } } 
当管理员手动指定某些 topic 分区需要执行优先副本选举时,相应的信息会被写入到 /admin/preferred_replica_election 节点下,然后触发执行上述回调方法。对于这些指定需要执行优先副本选举,且对应 topic 正常运行的分区,最终会调用 KafkaController#onPreferredReplicaElection 方法基于 PreferredReplicaPartitionLeaderSelector 分区 leader 副本选择器选举 leader 副本。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def  onPreferredReplicaElection Set [TopicAndPartition ], isTriggeredByAutoRebalance: Boolean  = false ) {    info("Starting preferred replica leader election for partitions %s" .format(partitions.mkString("," )))     try  {                  controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions                  deleteTopicManager.markTopicIneligibleForDeletion(partitions.map(_.topic))                  partitionStateMachine.handleStateChanges(partitions, OnlinePartition , preferredReplicaPartitionLeaderSelector)     } catch  {         case  e: Throwable  => error("Error completing preferred replica leader election for partitions %s" .format(partitions.mkString("," )), e)     } finally  {                  this .removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)                  deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))     } } 
关于 PreferredReplicaPartitionLeaderSelector 的实现,在前面已经分析过,这里不再重复撰述。
        
           
      PartitionsReassignedListener 实现了 IZkDataListener 接口,用于监听 /admin/reassign_partitions 节点,当管理员指定需要为某些 topic 重新分配副本时,相关信息会写入到该节点下,并触发执行相应的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def  doHandleDataChange String , data: AnyRef ) {    debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s" .format(dataPath, data))          val  partitionsReassignmentData = ZkUtils .parsePartitionReassignmentData(data.toString)          val  partitionsToBeReassigned = inLock(controllerContext.controllerLock) {         partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))     }     partitionsToBeReassigned.foreach { partitionToBeReassigned =>         inLock(controllerContext.controllerLock) {                          if  (controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {                 error("Skipping reassignment of partition %s for topic %s since it is currently being deleted" .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))                 controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)             } else  {                 val  context = ReassignedPartitionsContext (partitionToBeReassigned._2)                                  controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)             }         }     } } 
相关执行逻辑比较简单,如代码注释,其中 KafkaController#initiateReassignReplicasForTopicPartition 方法已经在前面分析过,不再重复撰述。
        
           
      ReassignedPartitionsIsrChangeListener 实现了 IZkDataListener 接口,用于监听指定 topic 分区的状态变更(关注 ISR 集合的变更),相关回调实现如下(省略部分日志打点):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 def  doHandleDataChange String , data: AnyRef ) {    inLock(controllerContext.controllerLock) {         debug("Reassigned partitions isr change listener fired for path %s with children %s" .format(dataPath, data))         val  topicAndPartition = TopicAndPartition (topic, partition)         try  {             controllerContext.partitionsBeingReassigned.get(topicAndPartition) match  {                                  case  Some (reassignedPartitionContext) =>                                          val  newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topic, partition)                     newLeaderAndIsrOpt match  {                         case  Some (leaderAndIsr) =>                              val  caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet                                                          if  (caughtUpReplicas == reassignedReplicas) {                                                                  controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext)                             } else  {                                                              }                         case  None  => error("Error handling reassignment of partition %s to replicas %s as it was never created" .format(topicAndPartition, reassignedReplicas.mkString("," )))                     }                                  case  None  =>             }         } catch  {             case  e: Throwable  => error("Error while handling partition reassignment" , e)         }     } } 
上述回调主要的逻辑就是判断对应 topic 分区重新分配的 RAR 集合中的副本是否都已经进入 ISR 集合,如果是的话则触发执行 KafkaController#onPartitionReassignment 方法的后续操作,否则什么也不做,继续等待下一次回调。建议将该监听器与前面第 7 小节结合起来看,能够更好的梳理整个副本再分配的执行流程。
        
           
      一个 Kafka 集群包含多个 broker 节点,每个 broker 节点上都会运行一个 Kafka Controller 实例,但是这些实例中只有一个是 leader 角色,其余均为 follower 角色,这些 follower 会在 leader 节点宕机时竞选成为新的 leader,以保证集群的可用性。
Kafka 定义了 ZookeeperLeaderElector 类来处理故障转移,用于在 leader 节点宕机时从 follower 节点中选举新的 leader。ZookeeperLeaderElector 的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class  ZookeeperLeaderElector (controllerContext: ControllerContext , // Controller  上下文对象                              electionPath: String , // /controller                              onBecomingLeader: ( ) =>  Unit ,                              onResigningAsLeader  : () => Unit ,                              brokerId: Int ,                               time: Time                               ) extends  LeaderElector  with  Logging  {          var  leaderId: Int  = -1           val  leaderChangeListener = new  LeaderChangeListener       } 
ZookeeperLeaderElector 的 ZookeeperLeaderElector#startup 方法会在 Kafka Controller 启动时被调用,以启动故障转移机制。该方法会在 /controller 节点上注册 LeaderChangeListener 监听器,并尝试竞选成为新的 leader。方法实现如下:
1 2 3 4 5 6 7 8 def  startup      inLock(controllerContext.controllerLock) {                  controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)                  elect     } } 
方法 ZookeeperLeaderElector#elect 已在前面多次提及过,用于执行 leader 选举,该方法主要在以下 3 种场景下被触发:
Kafka Controller 实例启动时。 
ZK 节点 /controller 下的数据被清除时。 
Broker 节点与 ZK 重新建立会话时。 
 
下面来看一下 ZookeeperLeaderElector#elect 方法的实现,该方法基于 ZK 的临时节点机制竞选 leader 角色,并返回当前节点是不是新的 leader 角色:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 def  elect Boolean  = {    val  timestamp = time.milliseconds.toString     val  electString = Json .encode(Map ("version"  -> 1 , "brokerid"  -> brokerId, "timestamp"  -> timestamp))          leaderId = this .getControllerID          if  (leaderId != -1 ) {         debug("Broker %d has been elected as leader, so stopping the election process." .format(leaderId))         return  amILeader     }     try  {                  val  zkCheckedEphemeral = new  ZKCheckedEphemeral (electionPath,             electString,             controllerContext.zkUtils.zkConnection.getZookeeper,             JaasUtils .isZkSecurityEnabled)         zkCheckedEphemeral.create()         info(brokerId + " successfully elected as leader" )                  leaderId = brokerId                  onBecomingLeader()     } catch  {                  case  _: ZkNodeExistsException  =>                          leaderId = getControllerID         case  e2: Throwable  =>             error("Error while electing or becoming leader on broker %d" .format(brokerId), e2)                          resign()     }          amILeader } 
基于 ZK 的临时节点机制实施 leader 选举是一个比较典型且成熟的方案,在很多分布式系统中均有应用。ZK 临时节点的特性就在于当 broker 节点与 ZK 断开连接时,之前创建的临时节点会被删除,如果对应的临时节点已经存在,则其它节点再次尝试创建时会抛出 ZkNodeExistsException 异常。如果当前 broker 节点成功竞选成为新的 leader,则会回调 ZookeeperLeaderElector#onBecomingLeader 方法,对应 KafkaController#onControllerFailover 实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 def  onControllerFailover     if  (isRunning) {         info("Broker %d starting become controller state transition" .format(config.brokerId))                  this .readControllerEpochFromZookeeper()                  this .incrementControllerEpoch(zkUtils.zkClient)                  this .registerReassignedPartitionsListener()          this .registerIsrChangeNotificationListener()          this .registerPreferredReplicaElectionListener()          partitionStateMachine.registerListeners()          replicaStateMachine.registerListeners()                   this .initializeControllerContext()                  this .sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)                  replicaStateMachine.startup()                  partitionStateMachine.startup()                  controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))         info("Broker %d is ready to serve as the new controller with epoch %d" .format(config.brokerId, epoch))                  this .maybeTriggerPartitionReassignment()                  this .maybeTriggerPreferredReplicaElection()                  if  (config.autoLeaderRebalanceEnable) {             info("starting the partition rebalance scheduler" )                          autoRebalanceScheduler.startup()             autoRebalanceScheduler.schedule(                 "partition-rebalance-thread" ,                 checkAndTriggerPartitionRebalance,                 5 ,                 config.leaderImbalanceCheckIntervalSeconds.toLong,                 TimeUnit .SECONDS )         }                  deleteTopicManager.start()     } else            info("Controller has been shut down, aborting startup/failover" ) } 
上述方法实现了一个 broker 节点竞选成为新的 leader 之后所需要执行的一些初始化操作,其中一些步骤已经在前面分析过了,例如状态机的启动和初始化过程、分区副本再分配机制等,下面重点来看一下步骤 4 和 11。
        
           
      前面我们分析了管理 Kafka Controller 上下文的类 ControllerContext, 步骤 4  实现了当一个 broker 节点由 follower 角色切换成 leader 角色时对上下文执行初始化的操作。相关实现位于 KafkaController#initializeControllerContext 方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private  def  initializeControllerContext          controllerContext.liveBrokers = zkUtils.getAllBrokersInCluster.toSet          controllerContext.allTopics = zkUtils.getAllTopics.toSet          controllerContext.partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSeq)     controllerContext.partitionLeadershipInfo = new  mutable.HashMap [TopicAndPartition , LeaderIsrAndControllerEpoch ]     controllerContext.shuttingDownBrokerIds = mutable.Set .empty[Int ]          this .updateLeaderAndIsrCache()          this .startChannelManager()          this .initializePreferredReplicaElection()          this .initializePartitionReassignment()          this .initializeTopicDeletion() } 
           
      本小节介绍的分区再平衡机制与前面分析消费者和 GroupCoordinator 组件时提到的分区再分配机制不同,分区再分配的目的在于为一个 group 名下的消费者分配分区,而分区再平衡的目的在于将 topic 分区的 leader 副本尽量均匀分散在不同的 broker 节点上,以保证各个 broker 节点的负载均衡。
步骤 11  依据配置 auto.leader.rebalance.enable 决定是否启动 partition-rebalance-thread 定时任务,以对集群中的 topic 分区执行再平衡策略,从而保证各个 broker 节点的负载均衡。当一个 topic 被新建时,topic 名下的分区和分区对应的 leader 副本会尽可能均衡分散到集群中的 broker 节点上,但是随着服务的运行可能存在一些 boker 节点的失效,从而逐渐让各个 broker 节点上运行的分区 leader 副本数目失衡,造成某些 broker 节点负载较高,最终影响 Kafka 的性能。定时任务 partition-rebalance-thread 的作用在于主动发现负载较高的 broker 节点,并执行分区 leader 副本再平衡操作。
相关逻辑位于 KafkaController#checkAndTriggerPartitionRebalance 方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 private  def  checkAndTriggerPartitionRebalance Unit  = {    if  (isActive) {         trace("checking need to trigger partition rebalance" )                  var  preferredReplicasForTopicsByBrokers: Map [Int , Map [TopicAndPartition , Seq [Int ]]] = null          inLock(controllerContext.controllerLock) {                          preferredReplicasForTopicsByBrokers =                     controllerContext.partitionReplicaAssignment                                                          .filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic))                                                          .groupBy { case  (_, assignedReplicas) => assignedReplicas.head }         }         debug("preferred replicas by broker "  + preferredReplicasForTopicsByBrokers)                  preferredReplicasForTopicsByBrokers.foreach { case  (leaderBroker, topicAndPartitionsForBroker) =>             var  imbalanceRatio: Double  = 0                           var  topicsNotInPreferredReplica: Map [TopicAndPartition , Seq [Int ]] = null              inLock(controllerContext.controllerLock) {                 topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case  (topicPartition, _) =>                     controllerContext.partitionLeadershipInfo.contains(topicPartition) &&                              controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker                  }                 debug("topics not in preferred replica "  + topicsNotInPreferredReplica)                 val  totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size                 val  totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size                                  imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker                 trace("leader imbalance ratio for broker %d is %f" .format(leaderBroker, imbalanceRatio))             }                          if  (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100 )) {                 topicsNotInPreferredReplica.keys.foreach { topicPartition =>                     inLock(controllerContext.controllerLock) {                                                  if  (controllerContext.liveBrokerIds.contains(leaderBroker) &&                                  controllerContext.partitionsBeingReassigned.isEmpty &&                                  controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty &&                                  !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&                                  controllerContext.allTopics.contains(topicPartition.topic)) {                                                           onPreferredReplicaElection(Set (topicPartition), isTriggeredByAutoRebalance = true )                         }                     }                 }             }         }     } } 
分区 leader 副本再平衡的执行流程可以概括如下:
获取所有分区及其副本集合,按照优先副本 ID 进行组织; 
计算各个 broker 节点的不均衡比率; 
当某个 broker 节点的不均衡比率超过阈值时,按条件执行优先副本选举。 
 
一个 broker 节点不均衡比率在计算上等于 节点上不是以优先副本作为 leader 副本的分区数除以 broker 节点上运行的分区总数  ,当不均衡比率超过 leader.imbalance.per.broker.percentage 配置时,如果对应 topic 分区同时满足以下条件则触发优先副本选举,保证 broker 节点的负载均衡:
对应的 broker 节点是有效的。 
没有分区正在执行副本再分配。 
没有分区正在执行优先副本选举。 
分区所属 topic 是有效且正常运行的。 
 
        
           
      前面分析过 BrokerChangeListener 监听器,用于处理 boker 节点上下线的逻辑,这里的下线预示着对应的 broker 节点已经失效,而实际运维中还存在另外一种下线的场景,即由管理员主动触发下线(例如迁移机房、升级软件,修改 Kafka 配置等)。这一场景下对应的 broker 节点是正常运行的,如果我们需要下线这一类 broker 节点,Kafka 提供了更加温柔的方式,即 Controlled Shutdown 机制。相对于 broker 节点的宕机,Controlled Shutdown 关停 broker 节点的优势在于:
可以让日志数据全部落盘,避免重新上线后的日志恢复操作。 
可以对 leader 副本位于待下线 broker 节点上的分区进行迁移,保证分区的可用性。 
 
当管理员希望对目标 broker 节点执行 Controlled Shutdown 操作时,可以使用命令行工具向 Kafka Controller 发送 ControlledShutdownRequest 请求,相应的处理逻辑位于 KafkaController#shutdownBroker 方法中,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 def  shutdownBroker Int ): Set [TopicAndPartition ] = {         if  (!isActive) {         throw  new  ControllerMovedException ("Controller moved to another broker. Aborting controlled shutdown" )     }     controllerContext.brokerShutdownLock synchronized {         info("Shutting down broker "  + id)         inLock(controllerContext.controllerLock) {                          if  (!controllerContext.liveOrShuttingDownBrokerIds.contains(id)) throw  new  BrokerNotAvailableException ("Broker id %d does not exist." .format(id))                          controllerContext.shuttingDownBrokerIds.add(id)         }                  val  allPartitionsAndReplicationFactorOnBroker: Set [(TopicAndPartition , Int )] =             inLock(controllerContext.controllerLock) {                 controllerContext.partitionsOnBroker(id).map(topicAndPartition =>                     (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size))             }                  allPartitionsAndReplicationFactorOnBroker.foreach {             case  (topicAndPartition, replicationFactor) =>                                  inLock(controllerContext.controllerLock) {                     controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>                                                  if  (replicationFactor > 1 ) {                                                          if  (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {                                                                                                   partitionStateMachine.handleStateChanges(                                     Set (topicAndPartition), OnlinePartition , controlledShutdownPartitionLeaderSelector)                             }                                                          else  {                                 try  {                                                                          brokerRequestBatch.newBatch()                                     brokerRequestBatch.addStopReplicaRequestForBrokers(                                         Seq (id), topicAndPartition.topic, topicAndPartition.partition, deletePartition = false )                                     brokerRequestBatch.sendRequestsToBrokers(epoch)                                 } catch  {                                                                      }                                                                  replicaStateMachine.handleStateChanges(                                     Set (PartitionAndReplica (topicAndPartition.topic, topicAndPartition.partition, id)), OfflineReplica )                             }                         }                     }                 }         }                  def  replicatedPartitionsBrokerLeads Iterable [TopicAndPartition ] = inLock(controllerContext.controllerLock) {             trace("All leaders = "  + controllerContext.partitionLeadershipInfo.mkString("," ))             controllerContext.partitionLeadershipInfo.filter {                 case  (topicAndPartition, leaderIsrAndControllerEpoch) =>                     leaderIsrAndControllerEpoch.leaderAndIsr.leader == id &&                             controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1              }.keys         }         replicatedPartitionsBrokerLeads().toSet     } } 
对于位于待关停的 broker 节点上的分区,如果启用了副本机制则需要判断分区 leader 副本是否位于待关停的 broker 节点上,如果是的话则需要使用 ControlledShutdownLeaderSelector 分区 leader 副本选择器为当前分区重新分配新的 leader 副本和 ISR 集合,并将结果通知给集群中相应的 broker 节点;如果分区 leader 副本不位于待关停 broker 节点上则直接向该节点发送 StopReplicaRequest 请求,关闭节点上的副本即可,这里可能涉及到分区 ISR 集合的变更,需要将变更的结果通知给集群中相应的 broker 节点。
        
           
      本文介绍了 Kafka Controller 组件的功能与实现,在一个 Kafka 集群中运行着多个 broker 节点,这些节点在启动时彼此是相互独立的,但是依托于 Kafka Controller 组件可以协调这些 broker 节点的运行,以集群的身份统一对外提供服务。Kafka Controller 提供了对集群中所有分区和副本的状态管理、集群上下文信息管理、副本再分配、分区再平衡、Controlled Shutdown 机制、故障转移机制,以及与 ZK 交互等功能,可以看做是 Kafka 集群的中央控制器。