在 Kafka 的设计中,消费者一般都有一个 group 的概念(当然,也存在不属于任何 group 的消费者),将多个消费者组织成一个 group 可以提升消息的消费处理能力,同时又能保证消息消费的顺序性,不重复或遗漏消费。一个 group 名下的消费者包含一个 leader 角色和多个 follower 角色,虽然在消费消息方面这两类角色是等价的,但是 leader 角色相对于 follower 角色还担负着管理整个 group 的职责。当 group 中有新的消费者加入,或者某个消费者因为一些原因退出当前 group 时,亦或是订阅的 topic 分区发生变化时,都需要为 group 名下的消费者重新分配分区,在服务端确定好分区分配策略之后,具体执行分区分配的工作则交由 leader 消费者负责,并在完成分区分配之后将分配结果反馈给服务端。
前面在分析消费者运行机制时曾多次提到 GroupCoordinator 类,本篇我们就来分析一下 GroupCoordinator 组件的作用和实现。GroupCoordinator 组件主要功能包括对隶属于同一个 group 的消费者进行分区分配、维护内部 offset topic,以及管理消费者和消费者所属的 group 信息等。集群中的每一个 broker 节点在启动时都会创建并启动一个 GroupCoordinator 实例,每个实例都会管理集群中所有消费者 group 的一个子集。
GroupCoordinator 组件的定义与启动
GroupCoordinator 类的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
classGroupCoordinator( val brokerId: Int, // 所属的 broker 节点的 ID val groupConfig: GroupConfig, // Group 配置对象,记录了 group 中 session 过期的最小时长和最大时长,即超时时长的合法区间 val offsetConfig: OffsetConfig, // 记录 OffsetMetadata 相关的配置项 val groupManager: GroupMetadataManager, // 负责管理 group 元数据以及对应的 offset 信息 val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat], // 管理 DelayedHeartbeat 延时任务的炼狱 val joinPurgatory: DelayedOperationPurgatory[DelayedJoin], // 管理 DelayedJoin 延时任务的炼狱 time: Time) extendsLogging{
private[coordinator] defcleanupGroupMetadata(): Unit = { this.cleanupGroupMetadata(None) }
defcleanupGroupMetadata(deletedTopicPartitions: Option[Seq[TopicPartition]]) { val startMs = time.milliseconds() var offsetsRemoved = 0
// 遍历处理每个 group 对应的元数据信息 groupMetadataCache.foreach { case (groupId, group) => val (removedOffsets, groupIsDead, generation) = group synchronized { // 计算待移除的 topic 分区对应的 offset 元数据信息 val removedOffsets = deletedTopicPartitions match { // 从 group 元数据信息中移除指定的 topic 分区集合 caseSome(topicPartitions) => group.removeOffsets(topicPartitions) // 移除那些 offset 元数据已经过期的,且没有 offset 待提交的 topic 分区集合 caseNone => group.removeExpiredOffsets(startMs) }
// 如果 group 当前状态为 Empty,且名下 topic 分区所有的 offset 已经过期,则将该 group 状态切换成 Dead if (group.is(Empty) && !group.hasOffsets) { info(s"Group $groupId transitioned to Dead in generation ${group.generationId}") group.transitionTo(Dead) } (removedOffsets, group.is(Dead), group.generationId) }
// 获取 group 对应在 offset topic 中的分区编号 val offsetsPartition = partitionFor(groupId) val appendPartition = newTopicPartition(Topic.GroupMetadataTopicName, offsetsPartition) getMagic(offsetsPartition) match { // 对应 group 由当前 GroupCoordinator 进行管理 caseSome(magicValue) => val timestampType = TimestampType.CREATE_TIME val timestamp = time.milliseconds() // 获取当前 group 在 offset topic 中的分区对象 val partitionOpt = replicaManager.getPartition(appendPartition) partitionOpt.foreach { partition => // 遍历处理每个待移除的 topic 分区对应的 offset 元数据信息,封装成消息数据 val tombstones = removedOffsets.map { case (topicPartition, offsetAndMetadata) => trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata") val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition) Record.create(magicValue, timestampType, timestamp, commitKey, null) }.toBuffer trace(s"Marked ${removedOffsets.size} offsets in $appendPartition for deletion.")
// 如果当前 group 已经失效,则从本地移除对应的元数据信息,并将 group 信息封装成消息, // 如果 generation 为 0 则表示当前 group 仅仅使用 kafka 存储 offset 信息 if (groupIsDead && groupMetadataCache.remove(groupId, group) && generation > 0) { tombstones += Record.create(magicValue, timestampType, timestamp, GroupMetadataManager.groupMetadataKey(group.groupId), null) trace(s"Group $groupId removed from the metadata cache and marked for deletion in $appendPartition.") }
if (tombstones.nonEmpty) { try { // 往 offset topic 中追加消息,不需要 ack,如果失败则周期性任务稍后会重试 partition.appendRecordsToLeader(MemoryRecords.withRecords(timestampType, compressionType, tombstones: _*)) offsetsRemoved += removedOffsets.size trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired/deleted offsets and/or metadata for group $groupId") } catch { case t: Throwable => error(s"Failed to append ${tombstones.size} tombstones to $appendPartition for expired/deleted offsets and/or metadata for group $groupId.", t) } } }
caseNone => info(s"BrokerId $brokerId is no longer a coordinator for the group $groupId. Proceeding cleanup for other alive groups") } }
info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - startMs} milliseconds.") }
Group 状态定义与转换
GroupState 特质定义了 group 的状态,并由 GroupCoordinator 进行维护。围绕 GroupState 特质,Kafka 实现了 5 个样例对象,分别用于描述 group 的 5 种状态:
PreparingRebalance :表示 group 正在准备执行分区再分配操作。
AwaitingSync :表示 group 正在等待 leader 消费者的分区分配结果,新版本已更名为 CompletingRebalance。
Stable :表示 group 处于正常运行状态。
Dead :表示 group 名下已经没有消费者,且对应的元数据已经(或正在)被删除。
Empty :表示 group 名下已经没有消费者,并且正在等待记录的所有 offset 元数据过期。
Group 状态之间的转换以及转换原因如下图和表所示:
当前状态
目标状态
转换原因
PreparingRebalance
AwaitingSync
group 之前名下所有的消费者都已经申请加入,或者等待消费者申请加入超时。
PreparingRebalance
Empty
group 名下的所有消费者都已经离开。
PreparingRebalance
Dead
group 对应的元数据信息被移除。
AwaitingSync
Stable
group 收到来自 leader 消费者的分区分配结果。
AwaitingSync
PreparingRebalance
1. 有消费者申请加入或退出; 2. 名下消费者更新了元数据信息; 3. 名下消费者心跳超时。
AwaitingSync
Dead
group 对应的元数据信息被移除。
Stable
PreparingRebalance
1. 有消费者申请加入或退出; 2. 名下消费者心跳超时。
Stable
Dead
group 对应的元数据信息被移除。
Empty
PreparingRebalance
有消费者申请加入。
Empty
Dead
1. group 名下所有的 offset 元数据信息已经过期; 2. group 对应的元数据信息被移除。
privatedefonGroupLoaded(group: GroupMetadata) { group synchronized { info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}") assert(group.is(Stable) || group.is(Empty)) // 遍历更新当前 group 名下所有消费者的心跳信息 group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _)) } }
val startMs = time.milliseconds() // 获取并处理 topic 分区对应的 Log 对象 replicaManager.getLog(topicPartition) match { caseNone => // 不存在 warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log") caseSome(log) => var currOffset = log.logStartOffset val buffer = ByteBuffer.allocate(config.loadBufferSize)
// 记录 topic 分区与对应的 offset 信息映射关系 val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]() val removedOffsets = mutable.Set[GroupTopicPartition]()
// 记录 group 与对应的 group 元数据信息映射关系 val loadedGroups = mutable.Map[String, GroupMetadata]() val removedGroups = mutable.Set[String]()
// 遍历处理消息集合(深层迭代) MemoryRecords.readableRecords(bufferRead).deepEntries.asScala.foreach { entry => val record = entry.record require(record.hasKey, "Group metadata/offset entry key should not be null")
// 依据消息的 key 决定当前消息的类型 GroupMetadataManager.readMessageKey(record.key) match { // 如果是记录 offset 的消息 case offsetKey: OffsetKey => val key = offsetKey.key if (record.hasNullValue) { // 删除标记,则移除对应的 offset 信息 loadedOffsets.remove(key) removedOffsets.add(key) } else { // 非删除标记,解析并更新 key 对应 offset 信息 val value = GroupMetadataManager.readOffsetMessageValue(record.value) loadedOffsets.put(key, value) removedOffsets.remove(key) } // 如果是记录 group 元数据的消息 case groupMetadataKey: GroupMetadataKey => val groupId = groupMetadataKey.key val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value) if (groupMetadata != null) { // 非删除标记,记录加载的 group 元数据信息 trace(s"Loaded group metadata for group $groupId with generation ${groupMetadata.generationId}") removedGroups.remove(groupId) loadedGroups.put(groupId, groupMetadata) } else { // 删除标记 loadedGroups.remove(groupId) removedGroups.add(groupId) } // 未知的消息 key 类型 case unknownKey => thrownewIllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata") }
currOffset = entry.nextOffset } }
// 将在 offset topic 中存在 offset 信息的 topic 分区以是否在 offset topic 中包含 group 元数据信息进行区分 val (groupOffsets, emptyGroupOffsets) = loadedOffsets .groupBy(_._1.group) .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset) }) .partition { case (group, _) => loadedGroups.contains(group) }
// 遍历处理在 offset topic 中存在 group 元数据信息的 group loadedGroups.values.foreach { group => val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, OffsetAndMetadata]) // 更新 group 对应的元数据信息,主要是更新名下每个 topic 分区对应的 offset 信息 loadGroup(group, offsets) onGroupLoaded(group) }
// 遍历处理在 offset topic 中不存在 group 元数据信息的 group,但是存在 offset 信息,新建一个 emptyGroupOffsets.foreach { case (groupId, offsets) => val group = newGroupMetadata(groupId) // 更新 group 对应的元数据信息,主要是更新名下每个 topic 分区对应的 offset 信息 loadGroup(group, offsets) onGroupLoaded(group) }
// 检测需要删除的 group 元数据信息,如果对应 group 在本地有记录且在 offset topic 中存在 offset 信息, // 则不应该删除,此类 group 一般仅依赖 kafka 存储 offset 信息,而不存储对应的 group 元数据信息 removedGroups.foreach { groupId => if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId)) thrownewIllegalStateException(s"Unexpected unload of active group $groupId while loading partition $topicPartition") }
} }
Offset topic 中主要记录了 group 的元数据和对应的 offset 的消费位置信息,上述方法会分别解析这两类数据并据此来恢复 GroupCoordinator 本地记录的对应 group 的元数据信息。如果 offset topic 中包含对应 group 的元数据信息则恢复时会直接复用,否则会创建一个空的 GroupMetadata 对象(这类 group 一般仅使用 Kafka 存储 offset 位置数据),并应用 GroupMetadataManager#loadGroup 方法更新 group 名下每个 topic 分区的 offset 值,同时将 group 元数据记录到 GroupCoordinator 本地缓存中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
privatedefloadGroup(group: GroupMetadata, offsets: Map[TopicPartition, OffsetAndMetadata]): Unit = { // 遍历处理每个 topic 分区的 offset 信息,兼容更新老版本的过期时间 val loadedOffsets = offsets.mapValues { offsetAndMetadata => // 对应老版本的 offset 元数据,设置过期时间戳为 commit 时间加上系统默认的保留时间(默认为 24 小时) if (offsetAndMetadata.expireTimestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP) offsetAndMetadata.copy(expireTimestamp = offsetAndMetadata.commitTimestamp + config.offsetsRetentionMs) else offsetAndMetadata } trace(s"Initialized offsets $loadedOffsets for group ${group.groupId}") // 更新 group 名下每个 topic 分区的 offset 信息 group.initializeOffsets(loadedOffsets)
// 更新 group 对应的元数据信息 val currentGroup = this.addGroup(group) if (group != currentGroup) debug(s"Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed because there is already a cached group with generation ${currentGroup.generationId}") }
privatedefonGroupUnloaded(group: GroupMetadata) { group synchronized { info(s"Unloading group metadata for ${group.groupId} with generation ${group.generationId}") val previousState = group.currentState // 将当前 group 切换成 Dead 状态 group.transitionTo(Dead)
当 GroupCoordinator 不再管理相应的 group 时,会将本地记录的 group 状态切换成 Dead,同时针对来自该 group 名下消费者的 JoinGroupRequest 请求均会响应 NOT_COORDINATOR_FOR_GROUP 错误。此外,还会从本地移除之前管理的 offset topic 分区对象,以及对应的 group 元数据信息,实现如下:
GroupCoordinator 实例在具体处理 JoinGroupRequest 请求之前,首先会执行一系列的校验操作以保证发送请求的消费者和目标 group 都是合法的,且对应的 GroupCoordinator 能够正常处理当前请求。如果目标 group 不存在,则在未指定对应的消费者 ID 时会首先新建 group,然后将当前消费者添加到对应 group 中开始执行分区再分配操作。方法 GroupCoordinator#doJoinGroup 会校验消费者 ID (如果指定的话)能否被当前 group 识别,以及消费者指定的分区分配策略能否被当前 group 支持,如果这些条件都不能满足,则没有必要再继续为该消费者分配分区,方法实现如下:
group synchronized { if (!group.is(Empty) // 消费者指定的分区分配策略,对应的 group 不支持 && (group.protocolType != Some(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) { responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code)) } elseif (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) { // 消费者 ID 不能够被识别 responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code)) } else { // 依据 group 的当前状态分别进行处理 group.currentState match { // 目标 group 已经失效 caseDead => // 对应的 group 的元数据信息已经被删除,说明已经迁移到其它 GroupCoordinator 实例或者不再可用,直接返回错误码 responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code)) // 目标 group 正在执行分区再均衡操作 casePreparingRebalance => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { // 对于未知 ID 的消费者申请加入,创建对应的元数据信息,并分配 ID,同时切换 group 的状态为 PreparingRebalance,准备执行分区再分配 this.addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback) } else { // 对于已知 ID 的消费者重新申请加入,更新对应的元数据信息,同时切换 group 的状态为 PreparingRebalance,准备执行分区再分配 val member = group.get(memberId) this.updateMemberAndRebalance(group, member, protocols, responseCallback) } // 目标 group 正在等待 leader 消费者的分区分配结果 caseAwaitingSync => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { // 对于未知 ID 的消费者申请加入,创建对应的元数据信息,并分配 ID,同时切换 group 的状态为 PreparingRebalance,准备执行分区再分配 this.addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback) } else { // 对于已知 ID 的消费者重新申请加入 val member = group.get(memberId) if (member.matches(protocols)) { // 分区分配策略未发生变化,返回 GroupMetadata 的信息 responseCallback(JoinGroupResult( members = if (memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty }, memberId = memberId, generationId = group.generationId, subProtocol = group.protocol, leaderId = group.leaderId, errorCode = Errors.NONE.code)) } else { // 分区分配策略发生变化,更新对应的元数据信息,同时切换 group 的状态为 PreparingRebalance,准备执行分区再分配 this.updateMemberAndRebalance(group, member, protocols, responseCallback) } } // 目标 group 运行正常,或者正在等待 offset 过期 caseEmpty | Stable => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { // 对于未知 ID 的消费者申请加入,创建对应的元数据信息,并分配 ID,同时切换 group 的状态为 PreparingRebalance,准备执行分区再分配 this.addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback) } else { // 对于已知 ID 的消费者重新申请加入 val member = group.get(memberId) if (memberId == group.leaderId || !member.matches(protocols)) { // 当前消费者是 group leader 或支持的分区分配策略发生变化,更新对应的元数据信息,同时切换 group 的状态为 PreparingRebalance,准备执行分区再分配 this.updateMemberAndRebalance(group, member, protocols, responseCallback) } else { // 分区分配策略未发生变化,返回 GroupMetadata 信息 responseCallback(JoinGroupResult( members = Map.empty, memberId = memberId, generationId = group.generationId, subProtocol = group.protocol, leaderId = group.leaderId, errorCode = Errors.NONE.code)) } } }
// 如果当前 group 正在准备执行分区再分配,尝试执行 DelayedJoin 延时任务 if (group.is(PreparingRebalance)) joinPurgatory.checkAndComplete(GroupKey(group.groupId)) } } }
对于满足条件的消费者来说,需要依据 group 的当前运行状态分而治之。如果当前 group 的状态为 Dead,则说明对应的 group 不再可用,或者已经由其它 GroupCoordinator 实例管理,直接响应 UNKNOWN_MEMBER_ID 错误,消费者可以再次请求获取新接管的 GroupCoordinator 实例所在的位置信息。
如果当前 group 的状态为 PreparingRebalance,则说明对应的 group 正在准备执行分区再分配操作,此时:
对于新加入的消费者(未指定 ID),首先需要为其创建消费者 ID 和元数据信息,并交由目标 group 进行管理,然后开始执行分区再分配操作。
// 将 group 状态切换成 PreparingRebalance 状态,准备执行分区再分配操作 group.transitionTo(PreparingRebalance) info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
// 分区再均衡超时时长是所有消费者设置的超时时长的最大值 val rebalanceTimeout = group.rebalanceTimeoutMs // 创建 DelayedJoin 延时任务,用于等待消费者申请加入当前 group val delayedRebalance = newDelayedJoin(this, group, rebalanceTimeout) val groupKey = GroupKey(group.groupId) // 关注当前 group // 将延时任务添加到炼狱中进行管理 joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey)) }
上述方法首先会校验 group 当前状态是不是 AwaitingSync,如果是则说明当前 GroupCoordinator 实例正在等待 leader 消费者的分区分配的结果,此时如果有来自 follower 消费者的 SyncGroupRequest 请求,则直接响应 REBALANCE_IN_PROGRESS 错误,同时需要清空 group 名下所有消费者记录的分区分配信息。然后切换 group 的状态为 PreparingRebalance,表示开始准备执行分区再分配,并创建 DelayedJoin 延时任务等待 group 名下所有消费者发送 JoinGroupRequest 请求申请加入当前 group。
defprepareStoreGroup(group: GroupMetadata, groupAssignment: Map[String, Array[Byte]], responseCallback: Errors => Unit): Option[DelayedStore] = { // 依据 group 对应 offset topic 分区的消息版本进行处理 getMagic(partitionFor(group.groupId)) match { caseSome(magicValue) => val groupMetadataValueVersion = { if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0) 0.toShort elseGroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION }
val timestampType = TimestampType.CREATE_TIME val timestamp = time.milliseconds() // 创建记录 GroupMetadata 信息的消息,其中 value 是分区的分配结果 val record = Record.create(magicValue, timestampType, timestamp, GroupMetadataManager.groupMetadataKey(group.groupId), GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion))
// 获取 group 对应的 offset topic 分区对象 val groupMetadataPartition = newTopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId)) // 构造 offset topic 分区与消息集合的映射关系 val groupMetadataRecords = Map(groupMetadataPartition -> MemoryRecords.withRecords(timestampType, compressionType, record)) val generationId = group.generationId
defputCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { if (responseStatus.size != 1 || !responseStatus.contains(groupMetadataPartition)) thrownewIllegalStateException("Append status %s should only have one partition %s".format(responseStatus, groupMetadataPartition))
// 获取消息追加响应结果 val status = responseStatus(groupMetadataPartition)
val responseError = if (status.error == Errors.NONE) { // 追加成功 Errors.NONE } else { // ... 追加异常,对错误码执行一些转换操作,省略 }
// 执行回调函数 responseCallback(responseError) }
当消息被追加到 offset topic 中之后会依据消息的追加结果封装成对应的错误码,并回调 responseCallback 方法,这是一个 Errors => Unit 的函数,在本步骤中该函数只是简单的在追加失败时打印一行警告日志,毕竟追加的消息本来就是空的。
defhandleLeaveGroup(groupId: String, memberId: String, responseCallback: Short => Unit) { if (!isActive.get) { // GroupCoordinator 实例未启动 responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code) } elseif (!isCoordinatorForGroup(groupId)) { // 当前 GroupCoordinator 实例并不负责管理当前 group responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code) } elseif (isCoordinatorLoadingInProgress(groupId)) { // 当前 GroupCoordinator 实例正在加载该 group 对应的 offset topic 分区信息 responseCallback(Errors.GROUP_LOAD_IN_PROGRESS.code) } else { groupManager.getGroup(groupId) match { // 对应的 group 不存在或已经失效 caseNone => responseCallback(Errors.UNKNOWN_MEMBER_ID.code) caseSome(group) => group synchronized { if (group.is(Dead) || !group.has(memberId)) { responseCallback(Errors.UNKNOWN_MEMBER_ID.code) } else { val member = group.get(memberId) // 设置 MemberMetadata#isLeaving 为 true,并尝试完成对应的 DelayedHeartbeat 延时任务 this.removeHeartbeatForLeavingMember(group, member) // 从 group 元数据信息中移除对应的 MemberMetadata 对象,并切换状态 this.onMemberFailure(group, member) // 调用回调响应函数 responseCallback(Errors.NONE.code) } } } } }
如果发送 LeaveGroupRequest 请求的消费者所属的 group 存在且运行正常,则服务端首先会将对应消费者元数据信息的 MemberMetadata#isLeaving 字段设置为 true,标识当前消费者已经离线,并尝试触发关注当前消费者的 DelayedHeartbeat 延时任务。此外,还会将该消费者从之前所属的 group 元数据信息中移除,并依据 group 当前的状态决定是触发分区再分配操作,还是触发执行关注该 group 的 DelayedJoin 延时任务,相关实现位于 GroupCoordinator#onMemberFailure 方法中,前面已经分析过该方法,这里不再重复撰述。
总结
本文我们分析了 GroupCoordinator 组件的作用和实现,该组件与消费者之间关系密切,消费者在运行期间除了从 ReplicaManager 组件拉取消息进行消费,剩余的交互基本都由 GroupCoordinator 组件负责处理。Kafka 依赖该组件对消费者所属的 group 实施管理,并对 group 名下的消费者进行协调,主要提供了分区分配与再平衡支持、记录 group 的消费 offset 位置信息,以及维护与消费者之间的心跳等功能。此外,GroupCoordinator 内置了故障转移机制,以保证在 topic offset 对应分区 leader 副本失效时,能够切换到新的 GroupCoordinator 实例继续对外提供服务。