在分布式应用中,通常会引入冗余策略来保证集群中节点在宕机时的服务可用性,Kafka 在设计上也是如此。Kafka 会为每个 topic 分区创建多个副本,并将这些副本分散在多台 broker 节点上,以避免单点问题。一个分区的副本集合包含一个 leader 角色和多个 follower 角色,其中 leader 副本主要负责响应客户端对于指定 topic 分区消息的读写,并管理集合中的其它 follower 副本,而 follower 副本则主要负责与 leader 副本间保持数据同步,保证在 leader 副本失效时能够有新的 follower 选举成为新的 leader,以维持 Kafka 服务的正常运行。
Replica 组件
Replica 类用于定义 Kafka 中的副本,副本除了有前面介绍的 leader 和 follower 角色之分外,也区分 本地副本 和 远程副本 ,其中本地副本是指与其关联的 Log 对象位于相同 broker 节点上,而远程副本的 Log 对象则位于其它 broker 节点上。对于远程副本而言,当前 broker 节点仅维护其 LEO 位置信息。 远程副本的主要作用在于协助 leader 副本维护分区的 HW 位置值 ,具体过程将在后面分析 HW 位置管理时进行说明。
在前面介绍 Kafka 的日志存储机制时我们知道一个 topic 分区对应一个 Log 对象,而在设计上为了避免单点问题,一个 topic 分区又会包含多个副本,这些副本分布在多个不相同的 broker 节点上,如果某个副本正好位于其所属的 Log 对象所在的 broker 节点上,我们称之为本地副本,否则即为远程副本。
下面来看一下 Replica 类的字段定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 class Replica (val brokerId: Int , // 当前副本所在的 broker 的 ID val partition: Partition , // 当前副本所属的 topic 分区对象 time: Time = Time .SYSTEM , // 时间戳工具 initialHighWatermarkValue: Long = 0L, // 初始 HW 值 val log: Option [Log ] = None // 当前副本所属的 Log 对象,如果是远程副本,该字段为空,通过该字段可以区分是本地副本还是远程副本 ) extends Logging { @volatile private [this ] var highWatermarkMetadata = new LogOffsetMetadata (initialHighWatermarkValue) @volatile private [this ] var logEndOffsetMetadata = LogOffsetMetadata .UnknownOffsetMetadata @volatile private [this ] var lastFetchLeaderLogEndOffset = 0 L @volatile private [this ] var lastFetchTimeMs = 0 L @volatile private [this ] var _lastCaughtUpTimeMs = 0 L val topicPartition: TopicPartition = partition.topicPartition }
对于本地副本来说会持有所属 Log 对象的引用,可以基于这一点来判定当前副本是本地副本还是远程副本。此外,Replica 对象还记录了当前副本的 LEO 和 HW 值,以及最近一次从 leader 副本拉取消息的时间戳,同时还定义了相关方法用于维护这些信息,下面分别来看一下维护 LEO 和 HW 值的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def updateLogReadResult (logReadResult: LogReadResult ) { if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset) _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, logReadResult.fetchTimeMs) else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset) _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs) logEndOffset = logReadResult.info.fetchOffsetMetadata lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset lastFetchTimeMs = logReadResult.fetchTimeMs } private def logEndOffset_= (newLogEndOffset: LogOffsetMetadata ) { if (isLocal) { throw new KafkaException (s"Should not set log end offset on partition $topicPartition 's local replica $brokerId " ) } else { logEndOffsetMetadata = newLogEndOffset trace(s"Setting log end offset for replica $brokerId for partition $topicPartition to [$logEndOffsetMetadata ]" ) } }
方法 Replica#updateLogReadResult
用于更新当前 Replica 对象的 LEO 值。对于 follower 来说,当从 leader 完成一次消息同步操作后,follower 会更新本地记录的 LEO 值,并更新相应的时间戳信息,其中 _lastCaughtUpTimeMs
字段用于记录 follower 最近一次成功从 leader 拉取消息的时间戳,可以标识当前 follower 相对于 leader 的滞后程度。
由上面的实现可以看出,只有远程副本需要更新 LEO 值,因为远程副本未持有所属 Log 对象的引用,需要通过本地字段缓存当前副本的 LEO 值。Replica 类定义了 Replica#logEndOffset
方法用于获取当前副本的 LEO 值:
1 def logEndOffset : LogOffsetMetadata = if (isLocal) log.get.logEndOffsetMetadata else logEndOffsetMetadata
对于本地副本来说,可以调用其持有的 Log 对象的 Log#logEndOffsetMetadata
方法直接获取对应的 LEO 值,而对于远程副本来说则返回本地缓存的 LEO 值。
对于 HW 值而言,Replica 同样提供了更新的方法(如下),需要注意的一点是这里仅更新本地副本的 HW 值,因为远程副本所在的 broker 节点仅维护副本的 LEO 位置信息 :
1 2 3 4 5 6 7 8 9 def highWatermark_= (newHighWatermark: LogOffsetMetadata ) { if (isLocal) { highWatermarkMetadata = newHighWatermark trace(s"Setting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermark ]" ) } else { throw new KafkaException (s"Should not set high watermark on partition $topicPartition 's non-local replica $brokerId " ) } }
同时,Replica 也提供了获取当前副本 HW 值的方法,实现如下:
1 def highWatermark : LogOffsetMetadata = highWatermarkMetadata
Partition 组件
Partition 类用于定义 Kafka 中的分区,一个 topic 可以设置多个分区。前面在介绍 Kafka 架构与核心概念时曾提及过,Kafka 之所以需要引入分区的概念,主要是希望利用分布式系统中的多节点来提升 Kafka 集群的性能和可扩展性。因为一个 topic 的各个分区可以分布在不同的 broker 节点上,进而就能将 topic 的消息数据分散在这些 broker 节点上存储,对于消息的读写压力就可以由这些节点进行分摊。当我们感知到一个 topic 的消息读写量较大时,我们可以适当增加分区的数目来实现扩容的目的。设想如果我们不引入分区策略,而是由一个 broker 节点完整负责一个 topic,考虑每个 topic 之间的消息数据量和读写量可能存在较大差别,那么各个 broker 节点在负载均衡性上也会有较大的差异,最终影响的是集群整体的可用性。
此外,为了保证高可用性,Kafka 会为每个分区设置多个副本,Partition 提供了管理这些副本的方法,包括执行副本角色切换、维护 ISR 集合、管理 HW 值和 LEO 值,以及调用日志存储系统写入日志数据等。
下面来看一下 Partition 类的字段定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class Partition (val topic: String , // 分区所属的 topic val partitionId: Int , // 分区编号 time: Time , // 时间戳工具 replicaManager: ReplicaManager // 副本管理 ) extends Logging with KafkaMetricsGroup { val topicPartition = new TopicPartition (topic, partitionId) private val localBrokerId = replicaManager.config.brokerId private val logManager = replicaManager.logManager private val zkUtils = replicaManager.zkUtils private val assignedReplicaMap = new Pool [Int , Replica ] @volatile private var leaderEpoch: Int = LeaderAndIsr .initialLeaderEpoch - 1 @volatile var leaderReplicaIdOpt: Option [Int ] = None @volatile var inSyncReplicas: Set [Replica ] = Set .empty[Replica ] private var controllerEpoch: Int = KafkaController .InitialControllerEpoch - 1 }
Partition 中提供了多种方法实现,按照功能划分可以将其中的核心方法划分为以下 5 类:
副本对象操作:getOrCreateReplica / getReplica / removeReplica
副本角色切换:makeLeader / makeFollower
日志数据操作:delete / appendRecordsToLeader
ISR 集合管理:maybeExpandIsr / maybeShrinkIsr
HW 和 LEO 位置管理:checkEnoughReplicasReachOffset / maybeIncrementLeaderHW / updateReplicaLogReadResult
下面按照分类对这些方法逐一进行分析。
副本对象操作
Partition 对象定义了 Partition#assignedReplicaMap
字段用于记录了隶属于当前分区的所有副本 Replica 对象,即 AR 集合,并提供了相关方法用于管理该字段。其中 Partition#getReplica
方法和 Partition#removeReplica
方法分别用于从字段中获取和移除指定副本 ID 对应的副本 Replica 对象,实现比较简单。
本小节我们主要对 Partition#getOrCreateReplica
方法进行分析,该方法相对于 Partition#getReplica
方法的区别在于当给定的副本 ID 在本地找不到对应的副本 Replica 对象时,会创建一个新的 Replica 对象。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def getOrCreateReplica (replicaId: Int = localBrokerId): Replica = { assignedReplicaMap.getAndMaybePut(replicaId, { if (this .isReplicaLocal(replicaId)) { val config = LogConfig .fromProps(logManager.defaultConfig.originals, AdminUtils .fetchEntityConfig(zkUtils, ConfigType .Topic , topic)) val log = logManager.createLog(topicPartition, config) val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath) val offsetMap = checkpoint.read() val offset = math.min(offsetMap.getOrElse(topicPartition, 0 L), log.logEndOffset) new Replica (replicaId, this , time, offset, Some (log)) } else new Replica (replicaId, this , time) }) }
如果参数指定的副本 ID 对应的副本 Replica 对象在本地 AR 集合中不存在,则方法会执行创建对应的 Replica 对象。这里区分本地副本和远程副本,对于远程副本来说创建的过程如上述代码所示,比较简单,而对于本地副本来说,因为本地副本持有副本所属分区对应的 Log 对象,所以需要加载相关数据信息,包括配置、初始 HW 值,以及分区对应的 Log 对象。其中构造 Log 对象的过程由 LogManager#createLog
方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def createLog (topicPartition: TopicPartition , config: LogConfig ): Log = { logCreationOrDeletionLock synchronized { getLog(topicPartition).getOrElse { val dataDir = this .nextLogDir() val dir = new File (dataDir, topicPartition.topic + "-" + topicPartition.partition) dir.mkdirs() val log = new Log (dir, config, recoveryPoint = 0 L, scheduler, time) logs.put(topicPartition, log) info("Created log for partition [%s,%d] in %s with properties {%s}." .format(topicPartition.topic, topicPartition.partition, dataDir.getAbsolutePath, config.originals.asScala.mkString(", " ))) log } } }
副本角色切换
副本有 leader 和 follower 角色之分,Partition 分别提供了 Partition#makeLeader
方法和 Partition#makeFollower
方法用于将本地副本切换成相应的 leader 和 follower 角色。
切换本地副本为 leader 角色
方法 Partition#makeLeader
用于将本地副本切换成 leader 角色,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 def makeLeader (controllerId: Int , partitionStateInfo: PartitionState , correlationId: Int ): Boolean = { val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) { controllerEpoch = partitionStateInfo.controllerEpoch val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt) allReplicas.foreach(replica => getOrCreateReplica(replica)) val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica) inSyncReplicas = newInSyncReplicas leaderEpoch = partitionStateInfo.leaderEpoch zkVersion = partitionStateInfo.zkVersion val isNewLeader = if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) { false } else { leaderReplicaIdOpt = Some (localBrokerId) true } val leaderReplica = getReplica().get val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset val curTimeMs = time.milliseconds (assignedReplicas - leaderReplica).foreach { replica => val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0 L replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs) } if (isNewLeader) { leaderReplica.convertHWToLocalOffsetMetadata() assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult .UnknownLogReadResult )) } (maybeIncrementLeaderHW(leaderReplica), isNewLeader) } if (leaderHWIncremented) tryCompleteDelayedRequests() isNewLeader }
切换副本为 leader 角色的整体流程可以概括为:
更新本地记录的 kafka controller 的年代信息;
获取分区新的 AR 集合和 ISR 集合中所有副本对应的 Replica 对象,如果不存在则创建;
移除本地缓存的对应分区已经过期的副本 Replica 对象;
更新本地记录的分区 leader 副本的相关信息,包括 ISR 集合、leader 副本的年代信息等;
检测分区 leader 副本是否发生变化,如果当前副本之前是 follower 角色,或者对应的 topic 分区的副本之前未分配给当前 broker 节点,则说明对应 topic 分区的 leader 副本发生了变化;
遍历所有的 follower 副本,更新对应副本的相关时间戳信息,包括最近一次从 leader 副本拉取消息的时间戳,以及 leader 副本的 LEO 值等;
如果当前 leader 副本是新选举出来的,则尝试修正对应副本的 HW 值,并重置本地缓存的所有远程副本的相关信息;
尝试后移 leader 副本的 HW 值;
如果上一步后移了 leader 副本的 HW 值,则尝试执行监听当前 topic 分区的 DelayedFetch 和 DelayedProduce 延时任务,因为等待的条件可能已经满足。
其中,方法 Partition#maybeIncrementLeaderHW
用于尝试向后移动 leader 副本的 HW 值,相关实现我们将在本篇的后续部分进行分析。
切换本地副本为 follower 角色
方法 Partition#makeFollower
用于将本地副本切换成 follower 角色,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 def makeFollower (controllerId: Int , partitionStateInfo: PartitionState , correlationId: Int ): Boolean = { inWriteLock(leaderIsrUpdateLock) { val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt) val newLeaderBrokerId: Int = partitionStateInfo.leader controllerEpoch = partitionStateInfo.controllerEpoch allReplicas.foreach(r => getOrCreateReplica(r)) (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica) inSyncReplicas = Set .empty[Replica ] leaderEpoch = partitionStateInfo.leaderEpoch zkVersion = partitionStateInfo.zkVersion if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) { false } else { leaderReplicaIdOpt = Some (newLeaderBrokerId) true } } }
切换副本为 follower 角色的整体流程可以概括为:
更新本地记录的 kafka controller 的年代信息;
获取分区新的 AR 集合中所有副本对应的 Replica 对象,如果不存在则创建;
移除本地缓存的已经过期的副本 Replica 对象;
更新本地记录的分区 leader 副本的相关信息,因为 ISR 集合由 leader 副本管理,所以需要将 follower 副本记录的 ISR 集合置为空;
检测分区 leader 副本是否发生变化,如果发生变化则需要更新本地记录的 leader 副本的 ID。
相对于切换成 leader 角色来说,将本地副本切换成 follower 的过程要简单许多。
日志数据操作
Partition 提供了 Partition#delete
方法和 Partition#appendRecordsToLeader
方法用于操作日志数据,其中前者用于清空当前分区记录的副本相关信息,包括 AR 集合、ISR 集合,以及 leader 副本的 ID 值等信息,并异步删除分区对应的日志文件和索引文件(由 LogManager#asyncDelete
方法实现,会将日志文件和索引文件添加 .delete
标记删除后缀,并交由定时任务执行删除操作),而后者用于往当前分区的 leader 副本追加消息。删除操作的实现比较简单,这里重点来看一下往 leader 副本追加消息的过程,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 def appendRecordsToLeader (records: MemoryRecords , requiredAcks: Int = 0 ): LogAppendInfo = { val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) { leaderReplicaIfLocal match { case Some (leaderReplica) => val log = leaderReplica.log.get val minIsr = log.config.minInSyncReplicas val inSyncSize = inSyncReplicas.size if (inSyncSize < minIsr && requiredAcks == -1 ) { throw new NotEnoughReplicasException ( "Number of insync replicas for partition %s is [%d], below required minimum [%s]" .format(topicPartition, inSyncSize, minIsr)) } val info = log.append(records) replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey (this .topic, this .partitionId)) (info, maybeIncrementLeaderHW(leaderReplica)) case None => throw new NotLeaderForPartitionException ("Leader not local for partition %s on broker %d" .format(topicPartition, localBrokerId)) } } if (leaderHWIncremented) tryCompleteDelayedRequests() info }
首先我们多次提到的一点是,Kafka 只允许往目标 topic 分区的 leader 副本追加消息,而 follower 只能从 leader 副本同步消息,所以如果当前追加操作的是 follower 副本,则会抛出异常。
对于 leader 副本来说,在具体执行追加操作之前,如果用户指定了 acks 参数为 -1,即要求所有 ISR 副本在全部收到消息后才允许对客户端进行成功响应,那么会先检测当前分区的 ISR 集合中的副本数目是否大于等于配置的阈值(对应 min.insync.replicas
配置),如果数目不达标则会拒绝执行追加操作,防止数据丢失。具体追加消息数据的操作交由 Log#append
方法执行,该方法已经在前面的文章中分析过,这里不再重复撰述。完成了消息数据的追加操作后,Kafka 会立即尝试执行监听当前 topic 分区的 DelayedFetch 延时任务,避免让客户端和 follower 副本等待太久或超时,此外还会尝试后移 leader 副本的 HW 值。
ISR 集合管理
分区 leader 副本的一个重要的职责就是维护当前分区的 ISR 集合。在分布式应用中,考虑网络、机器性能等因素,follower 副本同步 leader 副本数据的状态是在动态变化的,如果一个 follower 副本与 leader 副本之间存在较大的同步延迟,则不应该被加入到 ISR 集合中,否则应该被纳入到 ISR 集合中的一员,从而能够在 leader 副本失效时,竞选成为新的 leader 副本,以保证 Kafka 服务的可用性。
Partition 类型分别定义了 Partition#maybeExpandIsr
方法和 Partition#maybeShrinkIsr
方法,用于将指定的副本在满足条件下加入到 ISR 集合中,以及依据给定的时间阈值将滞后于 leader 副本超过阈值时间的 follower 副本移出 ISR 集合。首先来看一下 Partition#maybeExpandIsr
方法的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 def maybeExpandIsr (replicaId: Int , logReadResult: LogReadResult ) { val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) { leaderReplicaIfLocal match { case Some (leaderReplica) => val replica = getReplica(replicaId).get val leaderHW = leaderReplica.highWatermark if (!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && replica.logEndOffset.offsetDiff(leaderHW) >= 0 ) { val newInSyncReplicas = inSyncReplicas + replica info(s"Expanding ISR for partition $topicPartition from ${inSyncReplicas.map(_.brokerId).mkString(",")} to ${newInSyncReplicas.map(_.brokerId).mkString(",")} " ) this .updateIsr(newInSyncReplicas) replicaManager.isrExpandRate.mark() } this .maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs) case None => false } } if (leaderHWIncremented) tryCompleteDelayedRequests() }
ISR 集合的扩张和收缩操作均由 leader 副本负责,对于给定的 follower 副本如果同时满足以下条件,则将其添加到 ISR 集合中:
目标 follower 副本不在当前分区的 ISR 集合中;
目标 follower 副本位于当前分区的 AR 集合中;
目标 follower 副本的 LEO 值已经追赶上对应 leader 副本的 HW 值。
对于同时满足上述条件的 follower 副本,Kafka 会将其添加到对应 topic 分区的 ISR 集合中,并将新的 ISR 集合信息记录到 ZK,同时更新 leader 副本本地记录的 ISR 集合。
方法 Partition#maybeShrinkIsr
用于收缩当前分区的 ISR 集合,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 def maybeShrinkIsr (replicaMaxLagTimeMs: Long ) { val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) { leaderReplicaIfLocal match { case Some (leaderReplica) => val outOfSyncReplicas = this .getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs) if (outOfSyncReplicas.nonEmpty) { val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas assert(newInSyncReplicas.nonEmpty) info("Shrinking ISR for partition [%s,%d] from %s to %s" .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString("," ), newInSyncReplicas.map(_.brokerId).mkString("," ))) this .updateIsr(newInSyncReplicas) replicaManager.isrShrinkRate.mark() this .maybeIncrementLeaderHW(leaderReplica) } else { false } case None => false } } if (leaderHWIncremented) tryCompleteDelayedRequests() } def getOutOfSyncReplicas (leaderReplica: Replica , maxLagMs: Long ): Set [Replica ] = { val candidateReplicas = inSyncReplicas - leaderReplica val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs) laggingReplicas }
对于 ISR 集合中的 follower 副本,如果其最近一次成功从 leader 副本拉取数据的时间戳相对于当前时间超过指定的阈值(对应 replica.lag.time.max.ms
配置,默认为 10 秒),则将其从 ISR 集合中移出,而不管当前 follower 副本与 leader 副本的数据延迟差异。一旦 follower 被从 ISR 踢出,Kafka 会将新的 ISR 集合信息上报给 ZK,同时更新 leader 副本本地记录的 ISR 集合。
HW 和 LEO 位置管理
Partition 定义了 Partition#checkEnoughReplicasReachOffset
方法和 Partition#maybeIncrementLeaderHW
方法,分别用于检测指定 offset 之前的消息是否已经被 ISR 集合中足够多的 follower 副本确认(ack),以及尝试向后移动 leader 副本的 HW 值。先来看一下 Partition#checkEnoughReplicasReachOffset
方法,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def checkEnoughReplicasReachOffset (requiredOffset: Long ): (Boolean , Errors ) = { leaderReplicaIfLocal match { case Some (leaderReplica) => val curInSyncReplicas = inSyncReplicas val minIsr = leaderReplica.log.get.config.minInSyncReplicas if (leaderReplica.highWatermark.messageOffset >= requiredOffset) { if (minIsr <= curInSyncReplicas.size) (true , Errors .NONE ) else (true , Errors .NOT_ENOUGH_REPLICAS_AFTER_APPEND ) } else { (false , Errors .NONE ) } case None => (false , Errors .NOT_LEADER_FOR_PARTITION ) } }
方法 Partition#checkEnoughReplicasReachOffset
接收一个 requiredOffset 参数,用于检测该 offset 之前的消息是否已经被确认,本质上就是将该 offset 与 leader 副本的 HW 值进行比较,如果 leader 副本的 HW 值大于等于该 offset 值,则认为之前的消息已经全部被确认。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private def maybeIncrementLeaderHW (leaderReplica: Replica , curTime: Long = time.milliseconds): Boolean = { val allLogEndOffsets = assignedReplicas.filter { replica => curTime - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs || inSyncReplicas.contains(replica) }.map(_.logEndOffset) val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata .OffsetOrdering ) val oldHighWatermark = leaderReplica.highWatermark if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) { leaderReplica.highWatermark = newHighWatermark debug("High watermark for partition [%s,%d] updated to %s" .format(topic, partitionId, newHighWatermark)) true } else { debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s" .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString("," ))) false } }
上述方法曾在前面的分析中多次出现,用于尝试后移 leader 副本的 HW 位置,其核心思想是选取 ISR 集合中副本最小的 LEO 值作为 leader 副本的新 HW 值,如果计算出来的 HW 值大于 leader 副本当前的 HW 值,则进行更新。考虑到一些位于 ISR 集合之外但是有机会加入 ISR 集合的副本加入 ISR 集合有一个延迟的过程,所以这里也考虑了这些滞后于 leader 副本时间较小的 follower 副本。
前面我们曾提及过远程副本的作用在于协助 leader 副本更新分区 HW 值,这里我们具体说明一下这一过程。分区 leader 副本所在的 broker 节点以远程副本的形式记录着所有 follower 副本的 LEO 值,当 follower 副本从 leader 副本同步数据时会告知 leader 副本从什么位置开始拉取数据,leader 副本会使用该 offset 值更新远程副本的 LEO 位置值。当 leader 副本需要更新分区 HW 值时会从所有远程副本中筛选出那些位于 ISR 集合中,或者与 leader 副本之间同步时间间隔位于 replica.lag.time.max.ms
内的副本,当这些副本中最小的 LEO 值大于当前 leader 副本的 HW 值时,则更新 leader 副本的 HW 值。
Partition 提供了 Partition#updateReplicaLogReadResult
方法用于更新指定 follower 副本的 LEO 值(具体通过调用 Replica#updateLogReadResult
方法实现),并在完成更新之后尝试调用 Partition#maybeExpandIsr
方法来扩张 ISR 集合,整体过程实现比较简单,不再展开。
Follower 副本在与 leader 副本进行数据同步时,会将从 leader 副本获取到的 HW 值与当前副本的 LEO 值进行比对,并选择较小者作为当前 follower 副本的 HW 值。这样就产生了一个问题,即 follower 副本的 HW 值与 leader 副本的 HW 值是有差距的,当选举某个 HW 滞后的 follower 副本作为新的 leader 时需要对数据进行截断,从而存在丢失消息的风险。为此,Kafka 0.11 版本引入了 Leader Epoch 机制以解决这一问题,关于 Leader Epoch 机制我们以后再补充说明。
ReplicaManager 组件
ReplicaManager 类用于管理分布在当前 broker 节点上的所有分区的副本信息,主要提供了创建并获取指定 topic 分区对象、副本管理、日志数据读写、副本角色转换,以及更新当前 broker 节点缓存的整个集群中全部分区的状态信息等功能。ReplicaManager 的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class ReplicaManager (val config: KafkaConfig , // 相关配置对象 metrics: Metrics , time: Time , // 时间戳工具 val zkUtils: ZkUtils , // ZK 工具类 scheduler: Scheduler , // 定时任务调度器 val logManager: LogManager , // 用于对分区日志数据执行读写操作 val isShuttingDown: AtomicBoolean , // 标记 kafka 服务是否正在执行关闭操作 quotaManager: ReplicationQuotaManager , threadNamePrefix: Option [String ] = None ) extends Logging with KafkaMetricsGroup { @volatile var controllerEpoch: Int = KafkaController .InitialControllerEpoch - 1 private val localBrokerId = config.brokerId private val allPartitions = new Pool [TopicPartition , Partition ](Some (tp => new Partition (tp.topic, tp.partition, time, this ))) val replicaFetcherManager = new ReplicaFetcherManager (config, this , metrics, time, threadNamePrefix, quotaManager) private val highWatermarkCheckPointThreadStarted = new AtomicBoolean (false ) val highWatermarkCheckpoints: Predef .Map [String , OffsetCheckpoint ] = config.logDirs.map(dir => (new File (dir).getAbsolutePath, new OffsetCheckpoint (new File (dir, ReplicaManager .HighWatermarkFilename )))).toMap private var hwThreadInitialized = false private val isrChangeSet: mutable.Set [TopicPartition ] = new mutable.HashSet [TopicPartition ]() private val lastIsrChangeMs = new AtomicLong (System .currentTimeMillis()) private val lastIsrPropagationMs = new AtomicLong (System .currentTimeMillis()) val delayedProducePurgatory: DelayedOperationPurgatory [DelayedProduce ] = DelayedOperationPurgatory [DelayedProduce ]( purgatoryName = "Produce" , localBrokerId, config.producerPurgatoryPurgeIntervalRequests) val delayedFetchPurgatory: DelayedOperationPurgatory [DelayedFetch ] = DelayedOperationPurgatory [DelayedFetch ]( purgatoryName = "Fetch" , localBrokerId, config.fetchPurgatoryPurgeIntervalRequests) }
Kafka 服务在启动时会创建 ReplicaManager 对象,并调用 ReplicaManager#startup
方法启动 ReplicaManager 管理的定时任务,即 isr-expiration 和 isr-change-propagation 定时任务。实现如下:
1 2 3 4 5 6 def startup () { scheduler.schedule("isr-expiration" , maybeShrinkIsr, period = config.replicaLagTimeMaxMs / 2 , unit = TimeUnit .MILLISECONDS ) scheduler.schedule("isr-change-propagation" , maybePropagateIsrChanges, period = 2500 L, unit = TimeUnit .MILLISECONDS ) }
定时任务 isr-expiration 周期性执行 ReplicaManager#maybeShrinkIsr
方法,尝试缩减当前 broker 节点管理的分区对应的 ISR 集合,具体缩减操作由 Partition#maybeShrinkIsr
方法实现,前面已经分析过,不再重复撰述。
定时任务 isr-change-propagation 周期性将 ISR 集合发生变化的 topic 副本信息更新到 ZK 相应节点下,Kafka 集群控制器基于 ZK 的 Watcher 机制监听相应节点,并在节点内容发生变化时向所有可用的 broker 节点发送 UpdateMetadataRequest 请求,以更新相应 broker 节点本地管理的整个集群中所有分区的状态信息。定时任务的执行逻辑由 ReplicaManager#maybePropagateIsrChanges
方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def maybePropagateIsrChanges () { val now = System .currentTimeMillis() isrChangeSet synchronized { if (isrChangeSet.nonEmpty && (lastIsrChangeMs.get() + ReplicaManager .IsrChangePropagationBlackOut < now || lastIsrPropagationMs.get() + ReplicaManager .IsrChangePropagationInterval < now)) { ReplicationUtils .propagateIsrChanges(zkUtils, isrChangeSet) isrChangeSet.clear() lastIsrPropagationMs.set(now) } } }
为了避免频繁操作 ZK,上述方法在设计上添加了一定的过滤条件,只有当最近一次 ISR 集合变化的时间距离现在超过 5 秒,或者距离上一次操作 ZK 已经超过 1 分钟,才允许再次操作 ZK。Kafka Controller 在成为 leader 角色时会在相应 ZK 路径上注册 Watcher 监听器,当监听到有数据变化时,会构建 UpdateMetadataRequest 请求对象发送给所有可用的 broker 节点,以更新 broker 节点本地缓存的整个集群所有分区的状态信息。
ReplicaManager 提供了 ReplicaManager#maybeUpdateMetadataCache
方法来处理 UpdateMetadataRequest 请求,方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def maybeUpdateMetadataCache (correlationId: Int , updateMetadataRequest: UpdateMetadataRequest , metadataCache: MetadataCache ): Seq [TopicPartition ] = { replicaStateChangeLock synchronized { if (updateMetadataRequest.controllerEpoch < controllerEpoch) { val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " + "old controller %d with epoch %d. Latest known controller epoch is %d" ).format(localBrokerId, correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, controllerEpoch) stateChangeLogger.warn(stateControllerEpochErrorMessage) throw new ControllerMovedException (stateControllerEpochErrorMessage) } else { val deletedPartitions = metadataCache.updateCache(correlationId, updateMetadataRequest) controllerEpoch = updateMetadataRequest.controllerEpoch deletedPartitions } } }
上述方法首先会校验当前 UpdateMetadataRequest 请求的年代信息,避免处理那些来自老的 kafka controller 的请求。对于合法的 UpdateMetadataRequest 请求,则会调用 MetadataCache#updateCache
方法更新所有分区的状态信息,并返回需要被移除的分区集合,同时更新本地缓存的 kafka controller 的年代信息。关于 MetadataCache 类的实现,留到后面针对性分析,这里先不展开。
除了上面介绍的 2 个定时任务以外,ReplicaManager 还定义了另外一个定时任务 highwatermark-checkpoint,该任务周期性将当前 broker 节点管理的每个 topic 分区的 HW 值更新到对应 log 目录下的 replication-offset-checkpoint 文件中。相关逻辑由 ReplicaManager#startHighWaterMarksCheckPointThread
方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def startHighWaterMarksCheckPointThread (): Unit = { if (highWatermarkCheckPointThreadStarted.compareAndSet(false , true )) scheduler.schedule( "highwatermark-checkpoint" , checkpointHighWatermarks, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit .MILLISECONDS ) } def checkpointHighWatermarks () { val replicas = allPartitions.values.flatMap(_.getReplica(localBrokerId)) val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath) for ((dir, reps) <- replicasByDir) { val hwms: Map [TopicPartition , Long ] = reps.map(r => r.partition.topicPartition -> r.highWatermark.messageOffset).toMap try { highWatermarkCheckpoints(dir).write(hwms) } catch { case e: IOException => fatal("Error writing to highwatermark file: " , e) Runtime .getRuntime.halt(1 ) } } }
具体执行逻辑如代码注释,比较简单。该定时任务会在当前 ReplicaManager 首次收到来自 kafka controller 的 LeaderAndIsrRequest 请求时被启动。
消息同步机制
为了支持在 topic 分区 leader 副本失效时,有新的副本可以继续对外提供服务,Kafka 为副本引入了 leader/follower 模型设计,follower 副本在平时并不负责与客户端进行交互,主要职责在于从 leader 副本同步消息数据,以备在 leader 副本失效时可以从所有符合条件的 follower 副本中选举一个新的 leader 副本,从而避免对应 topic 的长时间停车,本小节我们重点来分析一下 follower 副本从 leader 副本同步消息的操作。
ReplicaManager 使用 ReplicaFetcherManager 管理 follower 副本与 leader 副本的同步工作,ReplicaFetcherManager 继承自 AbstractFetcherManager 抽象类。ReplicaFetcherManager 将当前 broker 节点管理的分区对应的副本按照一定的条件进行分组,并为每个组创建一个 fetcher 线程,用于从对应 leader 副本所在的 broker 节点拉取指定 offset 的消息数据。
Fetcher 线程由 ReplicaFetcherThread 实现,ReplicaFetcherThread 继承自 AbstractFetcherThread 抽象类。每个 ReplicaFetcherManager 维护了一个 HashMap[BrokerAndFetcherId, AbstractFetcherThread]
类型的 AbstractFetcherManager#fetcherThreadMap
集合,用于记录每个分组对应的 fetcher 线程对象,其中 BrokerAndFetcherId 封装了目标 broker 节点的 id、host、port,以及对应 fetcher 线程 ID 等信息。
ReplicaFetcherManager 提供了多个方法用于管理 AbstractFetcherManager#fetcherThreadMap
集合,主要包括:
AbstractFetcherManager#addFetcherForPartitions
:将指定的待同步 topic 分区分组,并为每个分组创建并启动一个 fetcher 线程,从指定的 offset 开始与 leader 副本进行同步。
AbstractFetcherManager#removeFetcherForPartitions
:停止对指定 topic 分区集合的副本同步任务。
AbstractFetcherManager#shutdownIdleFetcherThreads
:关闭空闲的 fetcher 线程,相应线程不再为任何 topic 分区执行同步工作。
上述方法中 2 和 3 在实现上都比较简单,下面重点来看一下方法 1,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def addFetcherForPartitions (partitionAndOffsets: Map [TopicPartition , BrokerAndInitialOffset ]) { mapLock synchronized { val partitionsPerFetcher = partitionAndOffsets.groupBy { case (topicPartition, brokerAndInitialOffset) => BrokerAndFetcherId (brokerAndInitialOffset.broker, this .getFetcherId(topicPartition.topic, topicPartition.partition)) } for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) { var fetcherThread: AbstractFetcherThread = null fetcherThreadMap.get(brokerAndFetcherId) match { case Some (f) => fetcherThread = f case None => fetcherThread = this .createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker) fetcherThreadMap.put(brokerAndFetcherId, fetcherThread) fetcherThread.start() } fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (tp, brokerAndInitOffset) => tp -> brokerAndInitOffset.initOffset }) } } }
上述方法首先会考虑目标 broker 节点的网络位置信息(brokerId、host 和 port)和 fetcher 线程的 ID 对待同步的 topic 分区进行分组,并以这些信息作为对应 fetcher 线程对象在 AbstractFetcherManager#fetcherThreadMap
集合中的 key,如果 key 对应的 fetcher 线程对象不存在则会创建并启动新的线程,同时将待同步 topic 分区的同步起始 offset 传递给对应线程,然后唤醒线程执行。创建 fetcher 线程的实现如下:
1 2 3 4 5 6 7 override def createFetcherThread (fetcherId: Int , sourceBroker: BrokerEndPoint ): AbstractFetcherThread = { val threadName = threadNamePrefix match { case None => "ReplicaFetcherThread-%d-%d" .format(fetcherId, sourceBroker.id) case Some (p) => "%s:ReplicaFetcherThread-%d-%d" .format(p, fetcherId, sourceBroker.id) } new ReplicaFetcherThread (threadName, fetcherId, sourceBroker, brokerConfig, replicaMgr, metrics, time, quotaManager) }
ReplicaFetcherThread 继承自 ShutdownableThread 抽象方法,所以在线程被启动之后会循环调度执行 AbstractFetcherThread#doWork
方法,该方法会构造 FetchRequest 请求从 leader 副本拉取指定 offset 对应的消息数据,并处理 FetchResponse 响应。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 override def doWork () { val fetchRequest = inLock(partitionMapLock) { val fetchRequest = this .buildFetchRequest(partitionStates.partitionStates.asScala.map { state => state.topicPartition -> state.value }) if (fetchRequest.isEmpty) { trace("There are no active partitions. Back off for %d ms before sending a fetch request" .format(fetchBackOffMs)) partitionMapCond.await(fetchBackOffMs, TimeUnit .MILLISECONDS ) } fetchRequest } if (!fetchRequest.isEmpty) this .processFetchRequest(fetchRequest) }
上述方法仅仅是构造了 FetchRequest 请求,而发送和处理响应的过程则由 AbstractFetcherThread#processFetchRequest
方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 private def processFetchRequest (fetchRequest: REQ ) { val partitionsWithError = mutable.Set [TopicPartition ]() var responseData: Seq [(TopicPartition , PD )] = Seq .empty try { trace("Issuing to broker %d of fetch request %s" .format(sourceBroker.id, fetchRequest)) responseData = this .fetch(fetchRequest) } catch { } fetcherStats.requestRate.mark() if (responseData.nonEmpty) { inLock(partitionMapLock) { responseData.foreach { case (topicPartition, partitionData) => val topic = topicPartition.topic val partitionId = topicPartition.partition Option (partitionStates.stateValue(topicPartition)).foreach(currentPartitionFetchState => if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.offset) { Errors .forCode(partitionData.errorCode) match { case Errors .NONE => try { val records = partitionData.toRecords val newOffset = records.shallowEntries.asScala.lastOption.map(_.nextOffset).getOrElse(currentPartitionFetchState.offset) fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math .max(0 L, partitionData.highWatermark - newOffset) this .processPartitionData(topicPartition, currentPartitionFetchState.offset, partitionData) val validBytes = records.validBytes if (validBytes > 0 ) { partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState (newOffset)) fetcherStats.byteRate.mark(validBytes) } } catch { } case Errors .OFFSET_OUT_OF_RANGE => try { val newOffset = this .handleOffsetOutOfRange(topicPartition) partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState (newOffset)) error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" .format(currentPartitionFetchState.offset, topic, partitionId, newOffset)) } catch { } } }) } } } if (partitionsWithError.nonEmpty) { debug("handling partitions with error for %s" .format(partitionsWithError)) this .handlePartitionsWithErrors(partitionsWithError) } }
由上述实现可以看到方法 AbstractFetcherThread#processFetchRequest
主要做了两件事情:发送 FetchRequest 请求并阻塞等待响应,以及处理响应。其中发送 FetchRequest 请求的过程由 ReplicaFetcherThread#fetch
方法实现,该方法使用 NetworkClient 的阻塞版本 NetworkClientBlockingOps 向目标 broker 节点发送 FetchRequest 请求,并阻塞等待响应结果,然后将针对每个 topic 分区的响应结果封装成 PartitionData 对象交由后续处理。
在遍历处理对于每个 topic 分区的 FetchResponse 响应时,分为 3 种情况:
正常响应,拉回指定 offset 对应的消息数据。
异常响应,请求的 offset 不在 leader 副本允许的范围内。
其它异常响应。
对于 第 1 种情况 来说,会调用 ReplicaFetcherThread#processPartitionData
方法将从对应 leader 副本拉取回来的消息数据写入 follower 副本对应的 Log 对象中,并更新本地缓存的对应分区的消息同步状态信息。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def processPartitionData (topicPartition: TopicPartition , fetchOffset: Long , partitionData: PartitionData ) { try { val replica = replicaMgr.getReplica(topicPartition).get val records = partitionData.toRecords this .maybeWarnIfOversizedRecords(records, topicPartition) if (fetchOffset != replica.logEndOffset.messageOffset) throw new RuntimeException ("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d." .format(topicPartition, fetchOffset, replica.logEndOffset.messageOffset)) replica.log.get.append(records, assignOffsets = false ) val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark) replica.highWatermark = new LogOffsetMetadata (followerHighWatermark) if (quota.isThrottled(topicPartition)) quota.record(records.sizeInBytes) } catch { case e: KafkaStorageException => fatal(s"Disk error while replicating data for $topicPartition " , e) Runtime .getRuntime.halt(1 ) } }
追加消息数据由对应副本持有的 Log 对象的 Log#append
方法完成,如果成功追加则会更新副本对应的 HW 值。
如果在执行同步操作时,某个 topic 分区出现异常,则需要依据对应的异常类型分别处理,如果是除 OFFSET_OUT_OF_RANGE
以外的错误(对应 第 3 种情况 ),则会暂停到对应分区 leader 副本同步数据的请求,休整一段时间(对应 replica.fetch.backoff.ms
配置)之后再继续,对应 ReplicaFetcherThread#handlePartitionsWithErrors
方法实现,比较简单。
下面来看一下 第 2 种情况 ,如果同步操作请求的 offset 不合法,即位于 leader 副本的 [startOffset, LEO]
之外,则需要修正本地缓存的对应副本的同步状态信息,修正 offset 的过程由 ReplicaFetcherThread#handleOffsetOutOfRange
方法实现,这里需要区分 2 种情况:
请求同步的 offset 大于对应 leader 副本的 LEO 值。
请求同步的 offset 小于对应 leader 副本的 startOffset 值。
一般 follower 副本的 LEO 值都是小于等于 leader 副本的 LEO 值,但是如果发生以下场景(unclean leader election),则可能出现 follower 副本的 LEO 值大于 leader 副本的 LEO 值,此时如果 follower 副本请求同步 leader 副本就有可能出现请求的 offset 大于目标 leader 副本的 LEO 值的情况。这类场景的发生过程为(令场景中 follower 副本为 F):
F 副本失效,期间 F 所属分区的 leader 副本继续追加消息数据;
F 副本失效后恢复,继续从 leader 副本同步数据,但是在追赶上 leader 副本之前,所有 ISR 集合中的副本全部失效;
为了保证 Kafka 服务的正常运行,选举 F 成为对应 topic 分区新的 leader 副本,并开始负责处理来自生产者的消息读写请求;
上一任 leader 从失效中恢复,并成为 follower 角色,此时其 LEO 值很有可能大于 F 的 LEO 值。
针对这种情况简单的处理方式是将 follower 副本的消息进行截断,但是 Kafka 也提供了 unclean.leader.election.enable
配置,允许在发生这种情况时停服。相关实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 def handleOffsetOutOfRange (topicPartition: TopicPartition ): Long = { val replica = replicaMgr.getReplica(topicPartition).get val leaderEndOffset: Long = this .earliestOrLatestOffset(topicPartition, ListOffsetRequest .LATEST_TIMESTAMP , brokerConfig.brokerId) if (leaderEndOffset < replica.logEndOffset.messageOffset) { if (!LogConfig .fromProps(brokerConfig.originals, AdminUtils .fetchEntityConfig(replicaMgr.zkUtils, ConfigType .Topic , topicPartition.topic)).uncleanLeaderElectionEnable) { fatal("Exiting because log truncation is not allowed for partition %s," .format(topicPartition) + " Current leader %d's latest offset %d is less than replica %d's latest offset %d" .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset)) System .exit(1 ) } warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d" .format(brokerConfig.brokerId, topicPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset)) replicaMgr.logManager.truncateTo(Map (topicPartition -> leaderEndOffset)) leaderEndOffset } else { } }
有时候 leader 副本的 LEO 值也会明显领先于某个 follower 副本的 LEO 值,此时 follower 请求同步 leader 副本时可能出现请求同步的 offset 小于对应 leader 副本的 startOffset 值。出现这种情况的原因一般有以下 2 种:
follower 副本长时间失效,期间 leader 副本不断在追加新的数据,等到 follower 再次上线时,leader 副本对应 offset 位置的日志数据已被定时任务清除。
出现前面介绍的 unclean leader election 场景,follower 在执行截断操作到 HW 位置后,offset 仍然大于新 leader 的 LEO 值,此时执行同步会导致 OffsetOutOfRangeException 异常,follower 在处理该异常的期间,leader 副本因为追加了大量的数据而导致 follower 再次请求同步时,offset 小于 leader 副本的 startOffset 值。
出现以上这 2 种情况只需要将 follower 同步请求同步的 offset 置为 leader 副本的 startOffset 即可,此外还需要清空 follower 副本的 Log 对象,因为其中的数据已经全部失效,没有继续保留的意义。相关实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def handleOffsetOutOfRange (topicPartition: TopicPartition ): Long = { val replica = replicaMgr.getReplica(topicPartition).get val leaderEndOffset: Long = this .earliestOrLatestOffset(topicPartition, ListOffsetRequest .LATEST_TIMESTAMP , brokerConfig.brokerId) if (leaderEndOffset < replica.logEndOffset.messageOffset) { } else { val leaderStartOffset: Long = this .earliestOrLatestOffset(topicPartition, ListOffsetRequest .EARLIEST_TIMESTAMP , brokerConfig.brokerId) warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d" .format(brokerConfig.brokerId, topicPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset)) val offsetToFetch = Math .max(leaderStartOffset, replica.logEndOffset.messageOffset) if (leaderStartOffset > replica.logEndOffset.messageOffset) replicaMgr.logManager.truncateFullyAndStartAt(topicPartition, leaderStartOffset) offsetToFetch } }
上述方法 AbstractFetcherThread#handleOffsetOutOfRange
还会在 AbstractFetcherThread#addPartitions
方法中被调用,该方法用于为每个 topic 分区构造合法的分区同步状态 PartitionFetchState 对象,并更新本地缓存,同时唤醒消息数据同步操作,前面分析过的 AbstractFetcherManager#addFetcherForPartitions
调用了该方法。实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def addPartitions (partitionAndOffsets: Map [TopicPartition , Long ]) { partitionMapLock.lockInterruptibly() try { val newPartitionToState = partitionAndOffsets .filter { case (tp, _) => !partitionStates.contains(tp) } .map { case (tp, offset) => val fetchState = if (PartitionTopicInfo .isOffsetInvalid(offset)) new PartitionFetchState (this .handleOffsetOutOfRange(tp)) else new PartitionFetchState (offset) tp -> fetchState } val existingPartitionToState = partitionStates.partitionStates.asScala.map { state => state.topicPartition -> state.value }.toMap partitionStates.set((existingPartitionToState ++ newPartitionToState).asJava) partitionMapCond.signalAll() } finally { partitionMapLock.unlock() } }
该方法中调用 AbstractFetcherThread#handleOffsetOutOfRange
方法的目的在于当参数未指定 offset 时,利用该方法获取合法的同步 offset 值。
副本角色切换
ReplicaManager 定义了 ReplicaManager#becomeLeaderOrFollower
方法,用于处理来自 kafka controller 的 LeaderAndIsrRequest 请求,指导位于当前 broker 节点上的相应分区副本的角色切换工作。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 def becomeLeaderOrFollower (correlationId: Int , leaderAndISRRequest: LeaderAndIsrRequest , metadataCache: MetadataCache , onLeadershipChange: (Iterable [Partition ], Iterable [Partition ]) => Unit ): BecomeLeaderOrFollowerResult = { replicaStateChangeLock synchronized { val responseMap = new mutable.HashMap [TopicPartition , Short ] if (leaderAndISRRequest.controllerEpoch < controllerEpoch) { BecomeLeaderOrFollowerResult (responseMap, Errors .STALE_CONTROLLER_EPOCH .code) } else { val controllerId = leaderAndISRRequest.controllerId controllerEpoch = leaderAndISRRequest.controllerEpoch val partitionState = new mutable.HashMap [Partition , PartitionState ]() leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => val partition = this .getOrCreatePartition(topicPartition) val partitionLeaderEpoch = partition.getLeaderEpoch if (partitionLeaderEpoch < stateInfo.leaderEpoch) { if (stateInfo.replicas.contains(localBrokerId)) partitionState.put(partition, stateInfo) else { responseMap.put(topicPartition, Errors .UNKNOWN_TOPIC_OR_PARTITION .code) } } else { responseMap.put(topicPartition, Errors .STALE_CONTROLLER_EPOCH .code) } } val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) => stateInfo.leader == localBrokerId } val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty) this .makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap) else Set .empty[Partition ] val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty) this .makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache) else Set .empty[Partition ] if (!hwThreadInitialized) { this .startHighWaterMarksCheckPointThread() hwThreadInitialized = true } replicaFetcherManager.shutdownIdleFetcherThreads() onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower) BecomeLeaderOrFollowerResult (responseMap, Errors .NONE .code) } } }
副本角色切换的整体执行流程可以概括为:
更新本地缓存的 kafka controller 的年代信息;
校验请求的合法性,确保请求操作对应的分区 leader 副本年代信息合法,以及请求操作的分区副本位于当前 broker 节点上;
对请求的分区副本按照角色分类,并执行角色切换;
如果 highwatermark-checkpoint 定时任务尚未启动,则执行启动;
关闭空闲的副本数据同步 fetcher 线程;
因为副本角色发生变化,可能影响消费者的消费操作,尝试执行 GroupCoordinator 迁移操作;
封装响应结果返回。
上面的步骤中我们重点来看一下步骤 3,关于 GroupCoordinator 将留到后面的篇章中针对性分析。步骤 3 首先会将待处理的副本集合按照角色分为 leader 和 follower 两组,然后针对 leader 分组调用 ReplicaManager#makeLeaders
方法将对应的分区切换成 leader 角色,调用 ReplicaManager#makeFollowers
方法将对应的分区切换成 follower 角色。
方法 ReplicaManager#makeLeaders
的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private def makeLeaders (controllerId: Int , epoch: Int , partitionState: Map [Partition , PartitionState ], correlationId: Int , responseMap: mutable.Map [TopicPartition , Short ]): Set [Partition ] = { for (partition <- partitionState.keys) responseMap.put(partition.topicPartition, Errors .NONE .code) val partitionsToMakeLeaders: mutable.Set [Partition ] = mutable.Set () try { replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition)) partitionState.foreach { case (partition, partitionStateInfo) => if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) partitionsToMakeLeaders += partition else } } catch { } partitionsToMakeLeaders }
切换副本角色为 leader 的过程比较简单,首先停止这些待切换 follower 副本的数据同步 fetcher 线程,然后调用 Partition#makeLeader
方法逐个将副本切换成 leader 角色,该方法已在前面分析过,不再重复撰述。
方法 ReplicaManager#makeFollowers
的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 private def makeFollowers (controllerId: Int , epoch: Int , partitionState: Map [Partition , PartitionState ], correlationId: Int , responseMap: mutable.Map [TopicPartition , Short ], metadataCache: MetadataCache ): Set [Partition ] = { for (partition <- partitionState.keys) responseMap.put(partition.topicPartition, Errors .NONE .code) val partitionsToMakeFollower: mutable.Set [Partition ] = mutable.Set () try { partitionState.foreach { case (partition, partitionStateInfo) => val newLeaderBrokerId = partitionStateInfo.leader metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match { case Some (_) => if (partition.makeFollower(controllerId, partitionStateInfo, correlationId)) partitionsToMakeFollower += partition else case None => partition.getOrCreateReplica() } } replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition)) logManager.truncateTo(partitionsToMakeFollower.map { partition => (partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset) }.toMap) partitionsToMakeFollower.foreach { partition => val topicPartitionOperationKey = new TopicPartitionOperationKey (partition.topicPartition) this .tryCompleteDelayedProduce(topicPartitionOperationKey) this .tryCompleteDelayedFetch(topicPartitionOperationKey) } if (isShuttingDown.get()) { } else { val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => partition.topicPartition -> BrokerAndInitialOffset ( metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName), partition.getReplica().get.logEndOffset.messageOffset)).toMap replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) } } catch { } partitionsToMakeFollower }
切换副本为 follower 角色的过程相对要复杂一些,整体执行流程可以概括为:
检测对应新的 leader 副本所在 broker 节点是否可用,如果不可用则无需执行切换操作,否则调用 Partition#makeFollower
方法执行副本角色切换;
停止待切换副本的数据同步 fetcher 线程;
由于 leader 副本发生变化,新旧 leader 在 [HW, LEO]
之间的数据可能不一致,所以需要将当前副本截断到 HW 位置,以保证数据一致性;
尝试完成监听对应分区的 DelayedProduce 和 DelayedFetch 延时任务;
为新的 follower 副本集合创建并启动对应的数据同步 fetcher 线程(如果已存在,则复用)。
上述过程中涉及到的相关方法已经在前面分析过,不再重复撰述。
分区与副本管理
ReplicaManager 定义了 ReplicaManager#getOrCreatePartition
方法和 ReplicaManager#getPartition
方法用于获取本地缓存的指定 topic 分区的 Partition 对象,二者的区别在于前者会在本地检索不到目标 topic 分区时创建对应的 Partition 对象。同时,ReplicaManager 还提供了 ReplicaManager#getReplicaOrException
、ReplicaManager#getLeaderReplicaIfLocal
,以及 ReplicaManager#getReplica
方法用于获取指定 topic 分区的指定副本对象,实现上都比较简单,不展开分析。
下面来重点看一下关闭副本的 ReplicaManager#stopReplicas
方法实现,当 broker 节点收到来自 kafka controller 的 StopReplicaRequest 请求时,会关闭指定的副本,包括停止副本的数据同步 fetcher 线程,以及依据参数决定是否删除副本对应的 Log 对象和文件,并清空本地缓存的相关信息。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 def stopReplicas (stopReplicaRequest: StopReplicaRequest ): (mutable.Map [TopicPartition , Short ], Short ) = { replicaStateChangeLock synchronized { val responseMap = new collection.mutable.HashMap [TopicPartition , Short ] if (stopReplicaRequest.controllerEpoch() < controllerEpoch) { (responseMap, Errors .STALE_CONTROLLER_EPOCH .code) } else { val partitions = stopReplicaRequest.partitions.asScala controllerEpoch = stopReplicaRequest.controllerEpoch replicaFetcherManager.removeFetcherForPartitions(partitions) for (topicPartition <- partitions) { val errorCode = this .stopReplica(topicPartition, stopReplicaRequest.deletePartitions()) responseMap.put(topicPartition, errorCode) } (responseMap, Errors .NONE .code) } } } def stopReplica (topicPartition: TopicPartition , deletePartition: Boolean ): Short = { val errorCode = Errors .NONE .code getPartition(topicPartition) match { case Some (_) => if (deletePartition) { val removedPartition = allPartitions.remove(topicPartition) if (removedPartition != null ) { removedPartition.delete() val topicHasPartitions = allPartitions.keys.exists(tp => topicPartition.topic == tp.topic) if (!topicHasPartitions) BrokerTopicStats .removeMetrics(topicPartition.topic) } } case None => if (deletePartition && logManager.getLog(topicPartition).isDefined) logManager.asyncDelete(topicPartition) } errorCode }
如果在 StopReplicaRequest 请求中指明了要删除对应 topic 分区的日志和索引文件,则方法会调用 Partition#delete
方法执行删除操作,并清空本地缓存的相关信息。如果某个 broker 节点在宕机中恢复后,之前管理的 topic 分区很可能已经被分配到新的 broker 节点上,此时该 broker 节点已经不再管理相应的 topic 分区对象,如果收到相应的 StopReplicaRequest 请求,则仍然会调用 LogManager#asyncDelete
方法尝试删除之前遗留的日志文件和索引文件。
日志数据读写
ReplicaManager 提供了 ReplicaManager#appendRecords
方法,用于处理 ProduceRequest 请求,将给定的日志数据追加到对应 topic 分区的 leader 副本中。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 def appendRecords (timeout: Long , requiredAcks: Short , internalTopicsAllowed: Boolean , entriesPerPartition: Map [TopicPartition , MemoryRecords ], responseCallback: Map [TopicPartition , PartitionResponse ] => Unit ) { if (this .isValidRequiredAcks(requiredAcks)) { val sTime = time.milliseconds val localProduceResults = this .appendToLocalLog(internalTopicsAllowed, entriesPerPartition, requiredAcks) debug("Produce to local log in %d ms" .format(time.milliseconds - sTime)) val produceStatus = localProduceResults.map { case (topicPartition, result) => topicPartition -> ProducePartitionStatus ( result.info.lastOffset + 1 , new PartitionResponse (result.error, result.info.firstOffset, result.info.logAppendTime)) } if (this .delayedRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) { val produceMetadata = ProduceMetadata (requiredAcks, produceStatus) val delayedProduce = new DelayedProduce (timeout, produceMetadata, this , responseCallback) val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey (_)).toSeq delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) } else { val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) responseCallback(produceResponseStatus) } } else { val responseStatus = entriesPerPartition.map { case (topicPartition, _) => topicPartition -> new PartitionResponse ( Errors .INVALID_REQUIRED_ACKS , LogAppendInfo .UnknownLogAppendInfo .firstOffset, Record .NO_TIMESTAMP ) } responseCallback(responseStatus) } }
如果请求的 acks 参数合法,则会调用 ReplicaManager#appendToLocalLog
方法往相应 leader 副本对应的 Log 对象中追加日志数据,并依据以下条件决定是否延时响应:
acks 参数为 -1,表示需要 ISR 集合中全部的 follower 副本确认追加的消息数据;
请求添加的消息数据不为空;
至少有一个 topic 分区的消息追加成功。
如果上面 3 个条件同时满足,则方法会创建对应的 DelayedProduce 延时任务对象,并交由相应的炼狱进行管理。DelayedProduce 对象封装了响应回调函数(即 KafkaApis#handleProducerRequest
方法中定义的 sendResponseCallback 方法),当 ISR 集合中所有的 follower 副本完成对本次追加的日志数据的同步操作之后会触发响应操作,这里延时任务监听的 key 是 topic 分区对象,当某个 topic 分区完成消息追加操作时可以提前触发延时任务执行。关于 DelayedProduce 延时任务我们已经在前面分析过,读者可以将上述逻辑与上一篇中对 DelayedProduce 的分析结合起来进一步加深理解。
方法 ReplicaManager#appendToLocalLog
的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private def appendToLocalLog (internalTopicsAllowed: Boolean , entriesPerPartition: Map [TopicPartition , MemoryRecords ], requiredAcks: Short ): Map [TopicPartition , LogAppendResult ] = { entriesPerPartition.map { case (topicPartition, records) => if (Topic .isInternal(topicPartition.topic) && !internalTopicsAllowed) { (topicPartition, LogAppendResult ( LogAppendInfo .UnknownLogAppendInfo , Some (new InvalidTopicException (s"Cannot append to internal topic ${topicPartition.topic} " )))) } else { try { val partitionOpt = this .getPartition(topicPartition) val info = partitionOpt match { case Some (partition) => partition.appendRecordsToLeader(records, requiredAcks) case None => throw new UnknownTopicOrPartitionException ("Partition %s doesn't exist on %d" .format(topicPartition, localBrokerId)) } (topicPartition, LogAppendResult (info)) } catch { } } } }
上述方法最终调用了 Partition#appendRecordsToLeader
方法将消息数据追加到指定 topic 分区的 leader 副本中。
ReplicaManager 定义了 ReplicaManager#fetchMessages
方法,用于处理来自消费者或 follower 副本读取消息数据的 FetchRequest 请求。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 def fetchMessages (timeout: Long , replicaId: Int , fetchMinBytes: Int , fetchMaxBytes: Int , hardMaxBytesLimit: Boolean , fetchInfos: Seq [(TopicPartition , PartitionData )], quota: ReplicaQuota = UnboundedQuota , responseCallback: Seq [(TopicPartition , FetchPartitionData )] => Unit ) { val isFromFollower = replicaId >= 0 val fetchOnlyFromLeader: Boolean = replicaId != Request .DebuggingConsumerId val fetchOnlyCommitted: Boolean = !Request .isValidBrokerId(replicaId) val logReadResults = this .readFromLocalLog( replicaId = replicaId, fetchOnlyFromLeader = fetchOnlyFromLeader, readOnlyCommitted = fetchOnlyCommitted, fetchMaxBytes = fetchMaxBytes, hardMaxBytesLimit = hardMaxBytesLimit, readPartitionInfo = fetchInfos, quota = quota) if (Request .isValidBrokerId(replicaId)) this .updateFollowerLogReadResults(replicaId, logReadResults) val logReadResultValues = logReadResults.map { case (_, v) => v } val bytesReadable = logReadResultValues.map(_.info.records.sizeInBytes).sum val errorReadingData = logReadResultValues.foldLeft(false )( (errorIncurred, readResult) => errorIncurred || (readResult.error != Errors .NONE )) if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) { val fetchPartitionData = logReadResults.map { case (tp, result) => tp -> FetchPartitionData (result.error, result.hw, result.info.records) } responseCallback(fetchPartitionData) } else { val fetchPartitionStatus = logReadResults.map { case (topicPartition, result) => val fetchInfo = fetchInfos.collectFirst { case (tp, v) if tp == topicPartition => v }.getOrElse(sys.error(s"Partition $topicPartition not found in fetchInfos" )) (topicPartition, FetchPartitionStatus (result.info.fetchOffsetMetadata, fetchInfo)) } val fetchMetadata = FetchMetadata (fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader, fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus) val delayedFetch = new DelayedFetch (timeout, fetchMetadata, this , quota, responseCallback) val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey (tp) } delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) } }
从指定 topic 分区 leader 副本拉取消息的整体执行流程如下:
从本地副本读取指定位置和大小的消息数据;
如果是来自 follower 副本的请求,则更新对应的 follower 副本的状态信息,并尝试扩张对应 topic 分区的 ISR 集合,同时尝试执行监听该分区的 DelayedProduce 延时任务;
判定是否需要对请求方进行立即响应,如果需要则立即触发响应回调函数;
否则,构造 DelayedFetch 延时任务,监听对应的 topic 分区对象,并交由炼狱管理。
下面对上述各个步骤逐一进行分析,首先来看 步骤 1 ,对应 ReplicaManager#readFromLocalLog
方法,实现了从本地读取指定 topic 分区相应位置和大小的消息数据的功能,具体的消息数据读操作由 Log#read
方法实现。方法 ReplicaManager#readFromLocalLog
实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 def readFromLocalLog (replicaId: Int , fetchOnlyFromLeader: Boolean , readOnlyCommitted: Boolean , fetchMaxBytes: Int , hardMaxBytesLimit: Boolean , readPartitionInfo: Seq [(TopicPartition , PartitionData )], quota: ReplicaQuota ): Seq [(TopicPartition , LogReadResult )] = { def read (tp: TopicPartition , fetchInfo: PartitionData , limitBytes: Int , minOneMessage: Boolean ): LogReadResult = { val offset = fetchInfo.offset val partitionFetchSize = fetchInfo.maxBytes try { val localReplica = if (fetchOnlyFromLeader) getLeaderReplicaIfLocal(tp) else getReplicaOrException(tp) val maxOffsetOpt = if (readOnlyCommitted) Some (localReplica.highWatermark.messageOffset) else None val initialLogEndOffset = localReplica.logEndOffset.messageOffset val initialHighWatermark = localReplica.highWatermark.messageOffset val fetchTimeMs = time.milliseconds val logReadInfo = localReplica.log match { case Some (log) => val adjustedFetchSize = math.min(partitionFetchSize, limitBytes) val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage) if (shouldLeaderThrottle(quota, tp, replicaId)) FetchDataInfo (fetch.fetchOffsetMetadata, MemoryRecords .EMPTY ) else if (!hardMaxBytesLimit && fetch.firstEntryIncomplete) FetchDataInfo (fetch.fetchOffsetMetadata, MemoryRecords .EMPTY ) else fetch case None => error(s"Leader for partition $tp does not have a local log" ) FetchDataInfo (LogOffsetMetadata .UnknownOffsetMetadata , MemoryRecords .EMPTY ) } LogReadResult (info = logReadInfo, hw = initialHighWatermark, leaderLogEndOffset = initialLogEndOffset, fetchTimeMs = fetchTimeMs, readSize = partitionFetchSize, exception = None ) } catch { } } var limitBytes = fetchMaxBytes val result = new mutable.ArrayBuffer [(TopicPartition , LogReadResult )] var minOneMessage = !hardMaxBytesLimit readPartitionInfo.foreach { case (tp, fetchInfo) => val readResult = read(tp, fetchInfo, limitBytes, minOneMessage) val messageSetSize = readResult.info.records.sizeInBytes if (messageSetSize > 0 ) minOneMessage = false limitBytes = math.max(0 , limitBytes - messageSetSize) result += (tp -> readResult) } result }
如果本次请求是由 follower 副本发起,则会执行 ReplicaManager#updateFollowerLogReadResults
方法( 步骤 2 ),该方法主要做了以下 4 件事情:
更新指定 follower 副本的状态信息(包括 LEO 值、最近一次成功从 leader 拉取消息的时间戳等);
尝试扩张副本所属分区的 ISR 集合,因为 follower 的 LEO 值递增,可能已经符合加入 ISR 集合的条件;
因为有新的消息被成功追加,尝试后移对应 leader 副本的 HW 值;
尝试执行监听对应 topic 分区的 DelayedProduce 延时任务。
方法 ReplicaManager#updateFollowerLogReadResults
的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private def updateFollowerLogReadResults (replicaId: Int , readResults: Seq [(TopicPartition , LogReadResult )]) { debug("Recording follower broker %d log read results: %s " .format(replicaId, readResults)) readResults.foreach { case (topicPartition, readResult) => getPartition(topicPartition) match { case Some (partition) => partition.updateReplicaLogReadResult(replicaId, readResult) this .tryCompleteDelayedProduce(new TopicPartitionOperationKey (topicPartition)) case None => warn("While recording the replica LEO, the partition %s hasn't been created." .format(topicPartition)) } } }
步骤 3 会判定是否需要立即响应当前拉取消息的 FetchRequest 请求,如果满足以下条件之一则执行回调函数,立即响应请求:
请求指定期望立即响应。
请求不期望有响应数据。
当前已经有足够的响应数据。
读取日志数据期间出错。
如果满足以上条件之一,则会立即触发执行回调函数(即 KafkaApis#handleFetchRequest
方法中定义的 sendResponseCallback 方法)响应请求,该函数已经在前面分析过,不再重复撰述。否则会构造 DelayedFetch 延时任务,并交由相应的炼狱进行管理( 步骤 4 )。
集群分区状态管理
Kafka 的所有 broker 节点在本地均使用 MetadataCache 缓存整个集群上所有 topic 分区的状态信息,并由 kafka controller 通过 UpdateMetadataRequest 请求进行维护。MetadataCache 的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private [server] class MetadataCache (brokerId: Int ) extends Logging { private val cache = mutable.Map [String , mutable.Map [Int , PartitionStateInfo ]]() private var controllerId: Option [Int ] = None private val aliveBrokers = mutable.Map [Int , Broker ]() private val aliveNodes = mutable.Map [Int , collection.Map [ListenerName , Node ]]() }
ReplicaManager 提供了 ReplicaManager#maybeUpdateMetadataCache
方法用于处理 UpdateMetadataRequest 请求,该方法首先会校验请求中 kafka controller 的年代信息,以避免处理来自已经过期的 kafka controller 的请求,对于合法的请求则会调用 MetadataCache#updateCache
方法更新本地缓存的整个集群的 topic 分区状态信息。前面我们已经分析了 ReplicaManager#maybeUpdateMetadataCache
方法,但对于其中调用的 MetadataCache#updateCache
方法未展开分析,这里我们继续分析一下该方法的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 def updateCache (correlationId: Int , updateMetadataRequest: UpdateMetadataRequest ): Seq [TopicPartition ] = { inWriteLock(partitionMetadataLock) { controllerId = updateMetadataRequest.controllerId match { case id if id < 0 => None case id => Some (id) } aliveNodes.clear() aliveBrokers.clear() updateMetadataRequest.liveBrokers.asScala.foreach { broker => val nodes = new java.util.HashMap [ListenerName , Node ] val endPoints = new mutable.ArrayBuffer [EndPoint ] broker.endPoints.asScala.foreach { ep => endPoints += EndPoint (ep.host, ep.port, ep.listenerName, ep.securityProtocol) nodes.put(ep.listenerName, new Node (broker.id, ep.host, ep.port)) } aliveBrokers(broker.id) = Broker (broker.id, endPoints, Option (broker.rack)) aliveNodes(broker.id) = nodes.asScala } val deletedPartitions = new mutable.ArrayBuffer [TopicPartition ] updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) => val controllerId = updateMetadataRequest.controllerId val controllerEpoch = updateMetadataRequest.controllerEpoch if (info.leader == LeaderAndIsr .LeaderDuringDelete ) { this .removePartitionInfo(tp.topic, tp.partition) deletedPartitions += tp } else { val partitionInfo = this .partitionStateToPartitionStateInfo(info) this .addOrUpdatePartitionInfo(tp.topic, tp.partition, partitionInfo) } } deletedPartitions } }
MetadataCache 使用 MetadataCache#aliveBrokers
和 MetadataCache#aliveNodes
字段记录整个集群中可用的 broker 节点信息,当收到来自 kafka controller 的 UpdateMetadataRequest 请求时,MetadataCache 会清空本地缓存,并由请求信息重新构建新的可用的 broker 节点信息。此外还会依据 UpdateMetadataRequest 请求更新本地缓存的整个集群 topic 分区的状态信息(对应 MetadataCache#cache
字段)。
MetadataCache 提供了 MetadataCache#getTopicMetadata
方法用于获取本地缓存的指定 topic 的元数据信息,包括是否是内部 topic,以及对应 topic 下所有分区的元数据信息。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 def getTopicMetadata (topics: Set [String ], listenerName: ListenerName , errorUnavailableEndpoints: Boolean = false ): Seq [MetadataResponse .TopicMetadata ] = { inReadLock(partitionMetadataLock) { topics.toSeq.flatMap { topic => this .getPartitionMetadata(topic, listenerName, errorUnavailableEndpoints).map { partitionMetadata => new MetadataResponse .TopicMetadata (Errors .NONE , topic, Topic .isInternal(topic), partitionMetadata.toBuffer.asJava) } } } } private def getPartitionMetadata ( topic: String , listenerName: ListenerName , errorUnavailableEndpoints: Boolean ): Option [Iterable [MetadataResponse .PartitionMetadata ]] = { cache.get(topic).map { partitions => partitions.map { case (partitionId, partitionState) => val topicPartition = TopicAndPartition (topic, partitionId) val leaderAndIsr = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr val maybeLeader = this .getAliveEndpoint(leaderAndIsr.leader, listenerName) val replicas = partitionState.allReplicas val replicaInfo = this .getEndpoints(replicas, listenerName, errorUnavailableEndpoints) maybeLeader match { case None => new MetadataResponse .PartitionMetadata (Errors .LEADER_NOT_AVAILABLE , partitionId, Node .noNode(), replicaInfo.asJava, java.util.Collections .emptyList()) case Some (leader) => val isr = leaderAndIsr.isr val isrInfo = this .getEndpoints(isr, listenerName, errorUnavailableEndpoints) if (replicaInfo.size < replicas.size) { new MetadataResponse .PartitionMetadata (Errors .REPLICA_NOT_AVAILABLE , partitionId, leader, replicaInfo.asJava, isrInfo.asJava) } else if (isrInfo.size < isr.size) { new MetadataResponse .PartitionMetadata (Errors .REPLICA_NOT_AVAILABLE , partitionId, leader, replicaInfo.asJava, isrInfo.asJava) } else { new MetadataResponse .PartitionMetadata (Errors .NONE , partitionId, leader, replicaInfo.asJava, isrInfo.asJava) } } } } }
方法 MetadataCache#getPartitionMetadata
会校验对应分区的 AR 集合和 ISR 集合中的副本是否可用,如果存在不可用的副本则会返回 REPLICA_NOT_AVAILABLE
错误,如果分区的副本均可用则会返回分区的元数据信息,包括分区 ID、leader 副本所在节点信息、AR 集合,以及 ISR 集合。
总结
本文我们分析了 Kafka 的分区副本实现机制,了解到 Kafka 会为每个 topic 分区设置多个副本,并基于 leader/follower 模式将这些副本分为一个 leader 角色和多个 follower 角色。在 topic 分区正常运行期间,由 leader 副本负责处理来自客户端的消息读写请求,而 follower 副本仅负责从 leader 副本同步消息数据。一旦 leader 副本失效,Kafka 会从位于 ISR 集合中的 follower 副本中选择一个成为新的 leader 副本,以保证对应的 topic 能够继续对外提供服务。
冗余策略在分布式计算和存储领域是一种简单且有效的可靠性保障措施,了解 Kafka 的分区副本实现机制能够指导我们更好的设计实现自己的分布式应用。