Kafka 源码解析:日志数据存储机制

日志数据(亦称消息数据)的存储机制在 Kafka 整个设计与实现中既基础又核心。Kafka 采用本地文件系统对日志数据进行存储,并允许为一个 broker 节点设置多个 log 文件目录,每个 log 目录下存储的数据又按照 topic 分区进行划分,其中包含了一个 topic 分区名下消息数据对应的多组日志和索引文件。

Kafka 定义了 LogSegment 类和 Log 类对日志和索引数据进行管理,并定义了 LogManager 类管理一个 broker 节点下的所有 Log 对象,同时基于 Log 对象提供了对日志数据的加载、创建、删除,以及查询等功能,同时还维护了多个定时任务对日志数据执行清理、删除、刷盘,以及记录 HW 位置等操作,并提供了对 key 重复的消息数据执行压缩的机制。

image

上图展示了 topic、partition、replica、Log 和 LogSegment 之间的组织关系。在具体实现时组织如下:

  • 一个 broker 节点允许指定多个 log 目录,每个目录下包含多个以“topic-partition”命名的目录,即一个 log 目录下存储了多个 topic 分区对应的消息数据,并且一个 topic 分区只允许属于一个 log 目录。
  • 每个 topic 分区目录下包含多组日志(log)和索引(index、timeindex)文件,Kafka 定义了 LogSegment 类用于封装一组日志和索引文件。
  • 每个 topic 分区对应一个 Log 类对象(一个 broker 节点上只允许存放分区的一个副本,所以从 broker 视角来看一个分区对应一个 Log 类对象),其中包含了一系列隶属对应 topic 分区的 LogSegment 对象,Log 类采用跳跃表(SkipList)数据结构对这些 LogSegment 对象进行管理。

image

上图进一步展示了 Log 与 LogSegment 之间的组织关系,以及 LogSegment 在 Log 中基于 SkipList 的组织形式(其中青色小圆圈表示单个 LogSegment 对象)。

LogSegment 组件

每个 topic 分区目录下通常会包含多个 log 文件,这些 log 文件 以其中保存的消息的起始 offset 命名 。每个 log 文件由一个 LogSegment 对象进行管理,其中还包含了对应的 index 和 timeindex 文件。下面是关于某个 topic 分区目录下的文件列表(生产环境中一个 topic 分区目录下一般存在多组类似下面这样的文件):

1
2
$ ls topic-default-0/
00000000000000000122.index 00000000000000000122.log 00000000000000000122.timeindex

LogSegment 类的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class LogSegment(val log: FileRecords, // log 文件对象
val index: OffsetIndex, // index 文件对象
val timeIndex: TimeIndex, // timeindex 文件对象
val baseOffset: Long, // 当前日志分片文件中第一条消息的 offset 值
val indexIntervalBytes: Int, // 索引项之间间隔的最小字节数,对应 index.interval.bytes 配置
val rollJitterMs: Long,
time: Time) extends Logging {

/** 当前 LogSegment 的创建时间 */
private var created = time.milliseconds
/** 自上次添加索引项后,在 log 文件中累计加入的消息字节数 */
private var bytesSinceLastIndexEntry = 0
/** The timestamp we used for time based log rolling */
private var rollingBasedTimestamp: Option[Long] = None
/** 已追加消息的最大时间戳 */
@volatile private var maxTimestampSoFar = timeIndex.lastEntry.timestamp
/** 已追加的具备最大时间戳的消息对应的 offset */
@volatile private var offsetOfMaxTimestamp = timeIndex.lastEntry.offset

// ... 省略方法定义

}

其中 FileRecords 类用于封装和管理对应的 log 文件,OffsetIndex 类用于封装和管理对应的 index 文件,TimeIndex 类用于封装和管理对应的 timeindex 文件。这是支撑 Kafka 日志数据存储的 3 个基础类,要理解 Kafka 的日志存储机制,我们需要先理解这 3 个类的定义。

  • FileRecords

FileRecords 类用于描述和管理日志(分片)文件数据,对应一个 log 文件,其字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class FileRecords extends AbstractRecords implements Closeable {

/** 标识是否为日志文件分片 */
private final boolean isSlice;
/** 分片的起始位置 */
private final int start;
/** 分片的结束位置 */
private final int end;
/** 浅层拷贝 */
private final Iterable<FileChannelLogEntry> shallowEntries;
/** 如果是分片则表示分片的大小(end - start),如果不是分片则表示整个日志文件的大小 */
private final AtomicInteger size;
/** 读写对应的日志文件的通道 */
private final FileChannel channel;
/** 日志文件对象 */
private volatile File file;

// ... 省略方法定义

}

FileRecords 主要定义了对日志数据的追加、读取、删除、查找、截断,以及刷盘等操作,并依赖于 LogEntry 类对单条日志数据的 offset 和 value 进行封装,同时提供了对 log 文件中日志数据的 浅层遍历深层遍历 操作。日志数据在追加到 log 文件中之前可能会执行压缩操作,所谓浅层遍历是指在遍历 log 文件中的日志数据时将压缩后的数据看做是一个整体,而深层遍历则会尝试对这部分日志数据执行解压缩,并返回解压缩后的单条消息。

下面的示例中展示了一个具体的日志文件数据的部分内容(即前面提及的 00000000000000000122.log 文件):

1
2
3
4
5
6
7
8
9
10
11
12
13
LogEntry(122, Record(magic = 1, attributes = 0, compression = NONE, crc = 300223964, CreateTime = 1553937143494, key = 4 bytes, value = 16 bytes))
LogEntry(123, Record(magic = 1, attributes = 0, compression = NONE, crc = 1516889930, CreateTime = 1553937143505, key = 4 bytes, value = 16 bytes))
LogEntry(124, Record(magic = 1, attributes = 0, compression = NONE, crc = 1201423931, CreateTime = 1553937143507, key = 4 bytes, value = 16 bytes))
LogEntry(125, Record(magic = 1, attributes = 0, compression = NONE, crc = 1592544380, CreateTime = 1553937143507, key = 4 bytes, value = 16 bytes))
LogEntry(126, Record(magic = 1, attributes = 0, compression = NONE, crc = 599198486, CreateTime = 1553937143508, key = 4 bytes, value = 16 bytes))
LogEntry(127, Record(magic = 1, attributes = 0, compression = NONE, crc = 980691361, CreateTime = 1553937143509, key = 4 bytes, value = 16 bytes))
LogEntry(128, Record(magic = 1, attributes = 0, compression = NONE, crc = 4047753804, CreateTime = 1553937143511, key = 4 bytes, value = 16 bytes))
LogEntry(129, Record(magic = 1, attributes = 0, compression = NONE, crc = 4289660679, CreateTime = 1553937143511, key = 4 bytes, value = 16 bytes))
LogEntry(130, Record(magic = 1, attributes = 0, compression = NONE, crc = 4016824904, CreateTime = 1553937143512, key = 4 bytes, value = 16 bytes))
LogEntry(131, Record(magic = 1, attributes = 0, compression = NONE, crc = 3305927143, CreateTime = 1553937143512, key = 4 bytes, value = 16 bytes))
LogEntry(132, Record(magic = 1, attributes = 0, compression = NONE, crc = 3847705666, CreateTime = 1553937143513, key = 4 bytes, value = 16 bytes))

...

上面的示例中我们基于深层遍历调用 LogEntry#toString 方法打印了单条消息的概要信息。

  • OffsetIndex

OffsetIndex 类用于描述和管理索引文件数据,定义了对 index 文件的检索、追加,以及截断等功能。一个 OffsetIndex 对象对应一个 index 文件,用于提高消息检索的性能。下面的示例中展示了一个具体的 index 文件数据的部分内容(即前面提及的 00000000000000000122.index 文件):

1
2
3
4
5
6
7
8
9
10
11
12
165, 8910
252, 13608
355, 19170
658, 35532
961, 51894
1191, 64314
1494, 80676
1797, 97038
2100, 113400
2403, 129762

...

OffsetIndex 的索引项由 8 个字节构成,其中前面 4 个字节表示消息的相对 offset,后面 4 个字节表示消息所在文件的物理地址(position),其中相对 offset 参考的偏移量是对应文件的起始 offset,这样的设计将原本 long 类型(8 字节)的消息 offset 转换成 int 类型(4 字节)的相对 offset 进行存储,能够减少空间占用。此外,Kafka 在构造 index 文件(包括下面要介绍的 timeindex 文件)时并不会针对每个 offset 都建立对应的索引项,而是采用隔一段区间打一个点的稀疏索引机制,以进一步减少对磁盘空间的消耗。

  • TimeIndex

TimeIndex 类同样用于描述和管理索引文件数据,提供了基于时间戳检索日志数据的功能,对应 timeindex 文件。区别于 OffsetIndex 的地方在于 TimeIndex 的索引项由 12 个字节构成,其中前面 8 个字节表示当前 offset 之前已追加消息的最大时间戳(毫秒),后面 4 个字节表示相对 offset,等价于 OffsetIndex 索引项的前 4 个字节。下面的示例中展示了一个具体的 timeindex 文件数据的部分内容(即前面提及的 00000000000000000122.timeindex 文件):

1
2
3
4
5
6
7
8
9
10
11
12
1553937143565, 251
1553937143570, 284
1553937143594, 649
1553937143609, 944
1553937143631, 1166
1553937143652, 1483
1553937143669, 1770
1553937143691, 2096
1553937143707, 2378
1553937143714, 2676

...

LogSegment 可以看做是对一组日志和索引文件数据的封装,并提供了对这些数据执行追加、读取、截断、删除、刷盘,以及重建等功能。本小节接下来的内容,我们重点分析一下 LogSegment 中主要的日志和索引文件数据操作方法,包括:LogSegment#appendLogSegment#readLogSegment#recover 方法,其它方法在实现上都比较简单,读者要是感兴趣的话可以自己阅读源码。

追加日志数据

本小节来看一下 LogSegment#append 方法的实现,该方法用于往当前 LogSegment 对应的 log 文件中追加消息数据,并在需要时更新对应的 index 和 timeindex 索引数据。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def append(firstOffset: Long, // 待追加消息的起始 offset
largestOffset: Long, // 待追加消息中的最大 offset
largestTimestamp: Long, // 待追加消息中的最大时间戳
shallowOffsetOfMaxTimestamp: Long, // 最大时间戳消息对应的 offset
records: MemoryRecords) { // 待追加的消息数据
if (records.sizeInBytes > 0) {
trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d"
.format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp))
// 获取物理位置(当前分片的大小)
val physicalPosition = log.sizeInBytes()
if (physicalPosition == 0) rollingBasedTimestamp = Some(largestTimestamp)

require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.")

// 将消息数据追加到 log 文件
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset")

// 更新已追加的消息对应的最大时间戳,及其 offset
if (largestTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = largestTimestamp
offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
}

// 如果当前累计追加的日志字节数超过阈值(对应 index.interval.bytes 配置)
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
// 更新 index 和 timeindex 文件
index.append(firstOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
bytesSinceLastIndexEntry = 0 // 重置当前累计追加的日志字节数
}
// 更新累计加入的日志字节数
bytesSinceLastIndexEntry += records.sizeInBytes
}
}

如果当前追加的消息数据是有效的,则 LogSegment 会调用 FileRecords#append 方法将消息数据追加到对应的 log 文件中,并更新本地记录的已追加消息的最大时间戳及其 offset。前面我们介绍了 Kafka 并不会对每条消息都建立索引,而是采用稀疏索引的策略间隔指定大小的字节数(对应 index.interval.bytes 配置)建立索引项,如果当前累计追加的消息字节数超过该配置值,则 Kafka 会更新对应的 index 和 timeindex 数据。

读取日志数据

下面来看一下 LogSegment#read 方法,该方法用于从 LogSegment 对应的 log 文件中读取指定区间的消息数据,读取的消息内容由 startOffset、maxOffset、maxSize 和 maxPosition 这 4 个参数确定。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def read(startOffset: Long, // 读取消息的起始 offset
maxOffset: Option[Long], // 读取消息的结束 offset
maxSize: Int, // 读取消息的最大字节数
maxPosition: Long = size, // 读取消息的最大物理地址
minOneMessage: Boolean = false): FetchDataInfo = {

if (maxSize < 0)
throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))

// 获取当前 log 文件的字节大小
val logSize = log.sizeInBytes // this may change, need to save a consistent copy
// 获取小于等于 startOffset 的最大 offset 对应的物理地址 position
val startOffsetAndSize = this.translateOffset(startOffset)

// 如果读取的位置超出了当前文件,直接返回 null
if (startOffsetAndSize == null) return null

val startPosition = startOffsetAndSize.position // 起始 position
val offsetMetadata = new LogOffsetMetadata(startOffset, baseOffset, startPosition)

// 更新读取消息的最大字节数
val adjustedMaxSize = if (minOneMessage) math.max(maxSize, startOffsetAndSize.size) else maxSize
// 如果请求读取的消息最大字节数为 0,则返回一个空的结果对象
if (adjustedMaxSize == 0)
return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)

// 计算待读取的字节数
val length = maxOffset match {
// 如果未指定读取消息的结束位置
case None =>
// 直接读取到指定的最大物理地址
min((maxPosition - startPosition).toInt, adjustedMaxSize)
// 如果指定了读取消息的结束位置
case Some(offset) =>
// 如果结束位置小于起始位置,则直接返回一个空的结果对象
if (offset < startOffset)
return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
// 将结束位置 offset 转换成对应的物理地址
val mapping = this.translateOffset(offset, startPosition)
// 如果结束位置 maxOffset 超出当前日志文件,则使用日志文件长度
val endPosition = if (mapping == null) logSize else mapping.position
// 由 maxOffset、maxPosition,以及 maxSize 共同决定最终读取长度
min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt
}

// 读取对应的消息数据,并封装成 FetchDataInfo 对象返回
FetchDataInfo(
offsetMetadata,
log.read(startPosition, length),
firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
}

上述方法的主要逻辑在于确定读取消息的起始位置和读取长度,并最终需要调用 FileRecords#read 方法读取消息数据,该方法接收 2 个参数:position 和 size。参数 position 指代读取消息的起始物理地址,而 size 指代读取消息的字节数,而上述方法的主要逻辑就在于基于参数给定的 4 个坐标来确定 position 和 size 值。

参数 startOffset 设置了当前要读取的消息的起始相对 offset,而 position 是物理地址,所以需要调用 LogSegment#translateOffset 方法进行转换,该方法基于 二分查找算法 从 index 文件中获取小于等于 startOffset 的最大 offset 对应的物理地址。实现如下:

1
2
3
4
5
6
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogEntryPosition = {
// 基于二分查找获取小于等于参数 offset 的最大 offset,返回 offset 与对应的物理地址
val mapping = index.lookup(offset)
// 查找对应的物理地址 position
log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
}

确定好读取的起始物理地址之后,接下来就需要计算读取的消息字节数 size 值,另外 3 个参数(maxOffset、maxSize 和 maxPosition)用来约束生成 size 值,策略如下:

  1. 如果未指定 maxOffset,则 size 等于 max((maxPosition - startPosition), maxSize)
  2. 如果指定了 maxOffset,需要保证 maxOffset 大于等于 startOffset,然后获取 maxOffset 对应的物理地址,并将该物理地址与 maxPosition 进行比较,选择较小的一个与 startPosition 计算得到对应的 size 值,并保证该 size 值不超过 maxSize。

如果能够基于参数计算得到正确的 position 和 size 值,则方法会依据这两个值调用 FileRecords#read 方法读取对应的消息数据,并封装成 FetchDataInfo 对象返回。

重建索引数据

最后来看一下 LogSegment#recover 方法,该方法用于对 log 文件重建相应的 index 和 timeindex 文件,并校验 log 中数据的有效性。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def recover(maxMessageSize: Int): Int = {
// 清空 index 和 timeindex 文件
index.truncate()
index.resize(index.maxIndexSize)
timeIndex.truncate()
timeIndex.resize(timeIndex.maxIndexSize)
var validBytes = 0 // 记录通过验证的字节数
var lastIndexEntry = 0 // 最后一个索引项对应的物理地址
maxTimestampSoFar = Record.NO_TIMESTAMP
try {
// 遍历 log 文件,重建索引
for (entry <- log.shallowEntries(maxMessageSize).asScala) {
// 获取对应的消息 Record 对象
val record = entry.record
// 校验消息数据的有效性,如果存在问题则抛出异常
record.ensureValid()

// 更新本地记录的消息最大时间戳及其 offset 值
if (record.timestamp > maxTimestampSoFar) {
maxTimestampSoFar = record.timestamp
offsetOfMaxTimestamp = entry.offset
}

// 如果当前字节减去上一次记录索引的字节超过设置的索引项之间间隔的最小字节数,则添加索引项
if (validBytes - lastIndexEntry > indexIntervalBytes) {
val startOffset = entry.firstOffset
index.append(startOffset, validBytes)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
lastIndexEntry = validBytes
}
validBytes += entry.sizeInBytes()
}
} catch {
case e: CorruptRecordException =>
logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage))
}
// 截断日志和索引文件中无效的字节
val truncated = log.sizeInBytes - validBytes
log.truncateTo(validBytes)
index.trimToValidSize()
// A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)
timeIndex.trimToValidSize()
truncated
}

重建的过程实际上就是遍历 log 文件,并依据设置的索引项最小间隔字节数(对应 index.interval.bytes 配置)区间建立稀疏索引,期间会基于 Record#ensureValid 方法采用 CRC 校验消息数据的有效性,如果存在无效的数据,则退出循环并移除之后的日志和索引。

Log 组件

在一个 log 目录下存在多个以“topic-partition”命名的分区目录,每个 topic 分区对应一个 Log 对象(更准确来说是一个分区副本对应一个 Log 对象),用于管理名下的 LogSegment 对象集合, Log 类使用 SkipList 数据结构对 LogSegment 进行组织和管理 。在 SkipList 中以 LogSegment 的 baseOffset 为 key,以 LogSegment 对象自身作为 value。当读取消息数据时,我们可以基于 offset 快速定位到对应的 LogSegment 对象,然后调用 LogSegment#read 方法读取消息数据。当写入消息时,Kafka 并不允许向 SkipList 中的任意一个 LogSegment 对象追加数据,而只允许往 SkipList 中的最后一个 LogSegment 追加数据,Log 类提供了 Log#activeSegment 用于获取该 LogSegment 对象,称之为 activeSegment。

Log 类的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Log(@volatile var dir: File, // 当前 Log 对象对应的 topic 分区目录
@volatile var config: LogConfig, // 配置信息
@volatile var recoveryPoint: Long = 0L, // 恢复操作的起始 offset,即 HW 位置,之前的消息已经全部落盘
scheduler: Scheduler, // 定时任务调度器
time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {

/** 最近一次执行 flush 操作的时间 */
private val lastflushedTime = new AtomicLong(time.milliseconds)
/**
* 用于记录分配给当前消息的 offset,也是当前副本的 LEO 值:
* - messageOffset 记录了当前 Log 对象下一条待追加消息的 offset 值
* - segmentBaseOffset 记录了 activeSegment 对象的 baseOffset
* - relativePositionInSegment 记录了 activeSegment 对象的大小
*/
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _
/**
* 当前 Log 包含的 LogSegment 集合,SkipList 结构:
* - 以 baseOffset 作为 key
* - 以 LogSegment 对象作为 value
*/
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
/** 基于 topic 分区目录解析得到对应的 topic 分区对象 */
val topicPartition: TopicPartition = Log.parseTopicPartitionName(dir)
private val tags = Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString)
/** 当前 Log 对象对应的分区目录名称 */
def name: String = dir.getName

// ... 省略方法定义

}

初始化加载日志数据

Log 类在实例化时会调用 Log#loadSegments 方法加载对应 topic 分区目录下的 log、index 和 timeindex 文件。该方法主要做了以下 4 件事情:

  1. 删除标记为 deleted 或 cleaned 的文件,将标记为 swap 的文件加入到交换集合中,等待后续继续完成交换过程;
  2. 加载 topic 分区目录下全部的 log 文件和 index 文件,如果对应的 index 不存在或数据不完整,则重建;
  3. 遍历处理 1 中记录的 swap 文件,使用压缩后的 LogSegment 替换压缩前的 LogSegment 集合,并删除压缩前的日志和索引文件;
  4. 后处理,如果对应 SkipList 为空则新建一个空的 activeSegment,如果不为空则校验 recoveryPoint 之后数据的完整性。

方法 Log#loadSegments 的实现比较冗长,下面我们分步骤逐一分析各个过程,首先来看 步骤 1 ,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 1. 删除标记为 deleted 或 cleaned 的文件,将标记为 swap 的文件加入到交换集合中,等待后续继续完成交换过程
for (file <- dir.listFiles if file.isFile) {
if (!file.canRead) throw new IOException("Could not read file " + file)
val filename = file.getName
// 如果是标记为 deleted 或 cleaned 的文件,则删除:
// - 其中 deleted 文件是指标识需要被删除的 log 文件或 index 文件
// - 其中 cleaned 文件是指在执行日志压缩过程中宕机,文件中的数据状态不明确,无法正确恢复的文件
if (filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
file.delete()
}
// 如果是标记为 swap 的文件(可用于交换的临时文件),则说明日志压缩过程已完成,但是在执行交换过程中宕机,
// 因为 swap 文件已经保存了日志压缩后的完整数据,可以进行恢复:
// 1. 如果 swap 文件是 log 文件,则删除对应的 index 文件,稍后 swap 操作会重建索引
// 2. 如果 swap 文件是 index 文件,则直接删除,后续加载 log 文件时会重建索引
else if (filename.endsWith(SwapFileSuffix)) {
// 移除 swap 后缀
val baseName = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
// 如果是 index 文件,则直接删除,因为后续可以重建
if (baseName.getPath.endsWith(IndexFileSuffix)) {
file.delete()
}
// 如果是 log 文件,则删除对应的 index 文件
else if (baseName.getPath.endsWith(LogFileSuffix)) {
val index = new File(CoreUtils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
index.delete()
swapFiles += file // 将当前文件加入到 swap 集合中
}
}
}

这一步会遍历当前 topic 分区目录下的文件,并处理标记为 deleted、cleaned 和 swap 的文件(以这些名称作为文件后缀名)。这 3 类文件对应的含义为:

  • deleted 文件 :标识需要被删除的 log 文件和 index 文件。
  • cleaned 文件 :在执行日志压缩过程中宕机,文件中的数据状态不明确,无法正确恢复的文件。
  • swap 文件 :完成执行日志压缩后的文件,但是在替换原文件时宕机。

针对 deleted 和 cleaned 文件直接删除即可,对于 swap 文件来说,因为其中的数据是完整的,所以可以继续使用,只需再次完成 swap 操作即可。Kafka 针对 swap 文件的处理策略为:

  1. 如果 swap 文件是 log 文件,则删除对应的 index 文件,稍后的 swap 操作会重建索引。
  2. 如果 swap 文件是 index 文件,则直接删除,后续加载 log 文件时会重建索引。

完成了对于一些异常状态文件的处理,步骤 2 开始真正执行加载 log 和 index 文件的操作,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// 2. 加载 topic 分区目录下全部的 log 文件和 index 文件,如果对应的 index 文件不存在或数据不完整,则重建
for (file <- dir.listFiles if file.isFile) {
val filename = file.getName
// 处理 index 和 timeindex 文件
if (filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix)) {
// 如果索引文件没有对应的 log 文件,则删除 index 文件
val logFile =
if (filename.endsWith(TimeIndexFileSuffix))
new File(file.getAbsolutePath.replace(TimeIndexFileSuffix, LogFileSuffix))
else
new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
if (!logFile.exists) {
warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
file.delete()
}
}
// 处理 log 文件
else if (filename.endsWith(LogFileSuffix)) {
// 获取 baseOffset 值
val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
// 创建对应的 index 文件对象
val indexFile = Log.indexFilename(dir, start)
// 创建对应的 timeindex 文件对象
val timeIndexFile = Log.timeIndexFilename(dir, start)
val indexFileExists = indexFile.exists()
val timeIndexFileExists = timeIndexFile.exists()

// 创建对应的 LogSegment 对象
val segment = new LogSegment(
dir = dir,
startOffset = start,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time,
fileAlreadyExists = true)

// 如果对应的 index 文件存在,则校验数据完整性,如果不完整则重建
if (indexFileExists) {
try {
// 校验 index 文件的完整性
segment.index.sanityCheck()
// 如果对应的 timeindex 文件不存在,则重置对应的 mmb 对象
if (!timeIndexFileExists)
segment.timeIndex.resize(0)
// 校验 timeindex 文件的完整性
segment.timeIndex.sanityCheck()
} catch {
// 索引文件完整性异常,删除重建
case e: java.lang.IllegalArgumentException =>
warn(s"Found a corrupted index file due to ${e.getMessage}}. deleting ${timeIndexFile.getAbsolutePath}, " + s"${indexFile.getAbsolutePath} and rebuilding index...")
indexFile.delete()
timeIndexFile.delete()
segment.recover(config.maxMessageSize)
}
}
// 如果对应的 index 文件不存在,则重建
else {
error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
segment.recover(config.maxMessageSize)
}
// 记录 LogSegment 对象到 segments 集合中
segments.put(start, segment)
}
}

如果当前文件是 index 文件,但对应的 log 文件不存在,则直接删除,因为没有继续保留的意义。如果当前是 log 文件,则这一步会创建 log 文件对应的 LogSegment 对象并记录到 SkipList 中。期间会校验 log 文件对应的 index 和 timeindex 文件,如果索引文件不存在或其中的数据不完整,则会调用前面介绍的 LogSegment#recover 方法重建索引。

步骤 1 中将需要继续执行 swap 操作的文件记录到了 swapFiles 集合中, 步骤 3 的逻辑就是继续完成 swap 操作,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 3. 遍历处理步骤 1 中记录的 swap 文件,使用压缩后的 LogSegment 替换压缩前的 LogSegment 集合,并删除压缩前的日志和索引文件
for (swapFile <- swapFiles) {
// 移除 “.swap” 后缀
val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, ""))
val fileName = logFile.getName
// 基于 log 文件名得到对应的 baseOffset 值
val startOffset = fileName.substring(0, fileName.length - LogFileSuffix.length).toLong
val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix) // .index.swap
val index = new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
val timeIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TimeIndexFileSuffix) + SwapFileSuffix) // .timeindex.swap
val timeIndex = new TimeIndex(timeIndexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
// 创建对应的 LogSegment 对象
val swapSegment = new LogSegment(FileRecords.open(swapFile),
index = index,
timeIndex = timeIndex,
baseOffset = startOffset,
indexIntervalBytes = config.indexInterval,
rollJitterMs = config.randomSegmentJitter,
time = time)
info("Found log file %s from interrupted swap operation, repairing.".format(swapFile.getPath))
// 依据 log 文件重建索引文件,同时校验 log 文件中消息的合法性
swapSegment.recover(config.maxMessageSize)
// 查找 swapSegment 获取 [baseOffset, nextOffset] 区间对应的日志压缩前的 LogSegment 集合,
// 区间中的 LogSegment 数据都压缩到了 swapSegment 中
val oldSegments = this.logSegments(swapSegment.baseOffset, swapSegment.nextOffset())
// 将 swapSegment 对象加入到 segments 中,并将 oldSegments 中所有的 LogSegment 对象从 segments 中删除,
// 同时删除对应的日志文件和索引文件,最后移除文件的 ".swap" 后缀
this.replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true)
}

在完成对日志数据的压缩操作后,会将压缩的结果先保存为 swap 文件(以“.swap”作为文件后缀),并最终替换压缩前的日志文件,所以 swap 文件中的数据都是完整,只需要移除对应的“.swap”后缀,并构建对应的 LogSegment 对象即可。但是这里不能简单的将对应的 LogSegment 对象记录到 SkipList 中就万事大吉了,因为 SkipList 中还存在着压缩前的原文件对应的 LogSegment 对象集合,所以需要先将这些 LogSegment 对象集合及其对应的 log 文件和索引文件删除,这也是 Log#replaceSegments 方法的主要逻辑。

完成了前 3 步的工作, 步骤 4 会对前面加载的数据进行校验,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 4. 后处理,如果对应 SkipList 为空,则新建一个空的 activeSegment,如果不为空则校验 HW 之后数据的完整性
if (logSegments.isEmpty) {
// 如果 SkipList 为空,则需要创建一个 activeSegment,保证 SkipList 能够正常操作
segments.put(0L, new LogSegment(dir = dir,
startOffset = 0,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time,
fileAlreadyExists = false,
initFileSize = this.initFileSize(),
preallocate = config.preallocate))
} else {
// 如果 SkipList 不为空,则需要对其中的数据进行验证
if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
// 处理 broker 节点异常关闭导致的数据异常,需要验证 [recoveryPoint, activeSegment] 中的所有消息,并移除验证失败的消息
this.recoverLog()
// reset the index size of the currently active log segment to allow more entries
activeSegment.index.resize(config.maxIndexSize)
activeSegment.timeIndex.resize(config.maxIndexSize)
}
}

如果前面的步骤中并未加载到任何数据,则对应的 SkipList 是空的,为了保证 SkipList 能够正常工作,需要为其添加一个空的 activeSegment 对象。如果 SkipList 不为空则需要依据 log 目录下是否存在“.kafka_cleanshutdown”文件来判定之前 broker 是否是正常关闭的,如果为非正常关闭则需要对 recoveryPoint 之后的数据进行校验,如果数据存在不完整则进行丢弃,相关实现位于 Log#recoverLog 中,比较简单,不再展开。

追加日志数据

Log 类定义了 Log#append 方法,用于往 Log 对象中追加消息数据。需要注意的一点是,Log 对象使用 SkipList 管理多个 LogSegment,我们在执行追加消息时是不能够往 SkipList 中的任意 LogSegment 对象执行追加操作的,Kafka 设计仅允许往 activeSegment 对象中追加消息。方法 Log#append 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
// 1. 解析、校验待追加的消息数据,封装成 LogAppendInfo 对象
val appendInfo = this.analyzeAndValidateRecords(records)
// 如果消息数据个数为 0,则直接返回
if (appendInfo.shallowCount == 0) return appendInfo

// 2. 剔除待追加消息中未通过验证的字节部分
var validRecords = this.trimInvalidBytes(records, appendInfo)

try {
// 将待追加消息中剩余有效的字节追加到 Log 对象中
lock synchronized {
// 3.1 如果指定需要分配 offset
if (assignOffsets) {
// 获取当前 Log 对象对应的最后一个 offset 值,以此开始向后分配 offset
val offset = new LongRef(nextOffsetMetadata.messageOffset)
// 更新待追加消息的 firstOffset 为 Log 对象最后一个 offset 值
appendInfo.firstOffset = offset.value
val now = time.milliseconds
val validateAndOffsetAssignResult = try {
// 对消息(包括压缩后的)的 magic 值进行统一,验证数据完整性,并分配 offset,同时按要求更新消息的时间戳
LogValidator.validateMessagesAndAssignOffsets(
validRecords,
offset,
now,
appendInfo.sourceCodec,
appendInfo.targetCodec,
config.compact,
config.messageFormatVersion.messageFormatVersion,
config.messageTimestampType,
config.messageTimestampDifferenceMaxMs)
} catch {
case e: IOException =>
throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
}
validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
// 更新待追加消息的 lastOffset 值
appendInfo.lastOffset = offset.value - 1
// 如果时间戳类型为 LOG_APPEND_TIME,则修改时间戳
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
appendInfo.logAppendTime = now

// 如果在执行 validateMessagesAndAssignOffsets 操作时修改了消息的长度,则需要重新验证,防止消息过长
if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
for (logEntry <- validRecords.shallowEntries.asScala) {
if (logEntry.sizeInBytes > config.maxMessageSize) {
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException(
"Message size is %d bytes which exceeds the maximum configured message size of %s.".format(logEntry.sizeInBytes, config.maxMessageSize))
}
}
}
}
// 3.2 不需要分配 offset
else {
// 如果消息的 offset 不是单调递增,或者消息的 firstOffset 小于 Log 中记录的下一条消息 offset,则说明 appendInfo 非法
if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
throw new IllegalArgumentException("Out of order offsets found in " + records.deepEntries.asScala.map(_.offset))
}

// 4. 校验待追加消息的长度,保证不超过了单个 LogSegment 所允许的最大长度(对应 segment.bytes 配置)
if (validRecords.sizeInBytes > config.segmentSize) {
throw new RecordBatchTooLargeException(
"Message set size is %d bytes which exceeds the maximum configured segment size of %s.".format(validRecords.sizeInBytes, config.segmentSize))
}

// 5. 获取 activeSegment 对象,如果需要则创建新的 activeSegment 对象
val segment = this.maybeRoll(
messagesSize = validRecords.sizeInBytes,
maxTimestampInMessages = appendInfo.maxTimestamp,
maxOffsetInMessages = appendInfo.lastOffset)


// 6. 往 activeSegment 中追加消息
segment.append(
firstOffset = appendInfo.firstOffset,
largestOffset = appendInfo.lastOffset,
largestTimestamp = appendInfo.maxTimestamp,
shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
records = validRecords)

// 7. 更新 LEO 中记录的当前 Log 最后一个 offset 值
this.updateLogEndOffset(appendInfo.lastOffset + 1)

trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
.format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords))

// 8. 如果刷盘时间间隔达到阈值(对应 flush.messages 配置),则执行刷盘
if (unflushedMessages >= config.flushInterval)
this.flush() // 将 [recoveryPoint, logEndOffset) 之间的数据刷盘

appendInfo
}
} catch {
case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
}
}

追加消息数据操作的整体执行流程可以概括为:

  1. 解析并校验待追加的消息集合,将其封装成 LogAppendInfo 对象;
  2. 剔除待追加消息集合中未通过验证的字节部分;
  3. 如果指定需要为消息分配 offset,则对消息(包括压缩后的)执行分配 offset 操作,并对消息执行 magic 值统一、数据完整性校验,以及按需更新消息时间戳等操作;
  4. 如果指定不需要为消息分配 offset,则需要保证消息已有 offset 是单调递增,且起始 offset 不能小于当前 Log 对象中记录的下一条待追加消息的 offset;
  5. 校验处理后消息集合的总长度,保证不超过单个 LogSegment 对象所允许的最大长度;
  6. 获取目标 activeSegment 对象,如果需要则创建一个新的 activeSegment 对象并返回;
  7. 往目标 activeSegment 对象中追加消息数据,并更新当前 Log 对象中记录的下一条待追加消息的 offset 值;
  8. 如果当前时间距离上次执行刷盘操作的时间超过配置的时间间隔,则执行刷盘操作。

下面我们分步骤对整个执行过程进行进一步分析,首先来看 步骤 1 ,实现位于 Log#analyzeAndValidateRecords 方法中,该方法对待追加的消息集合中的消息逐条进行解析和验证,并封装成 LogAppendInfo 对象返回。实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private def analyzeAndValidateRecords(records: MemoryRecords): LogAppendInfo = {
var shallowMessageCount = 0 // 消息条数
var validBytesCount = 0 // 通过验证的消息字节数
var firstOffset = -1L // 第一条消息的 offset
var lastOffset = -1L // 最后一条消息的 offset
var sourceCodec: CompressionCodec = NoCompressionCodec // 生产者使用的压缩方式
var monotonic = true // 标识生产者为消息分配的内部 offset 是否是单调递增的
var maxTimestamp = Record.NO_TIMESTAMP // 消息的最大时间戳
var offsetOfMaxTimestamp = -1L // 最大时间戳消息对应的 offset

// 基于浅层迭代器迭代,对于压缩的消息不会解压缩
for (entry <- records.shallowEntries.asScala) {
// 记录第一条消息的 offset
if (firstOffset < 0) firstOffset = entry.offset
// 如果是单调递增的话,则在遍历过程中 lastOffset 应该始终小于当前的 offset
if (lastOffset >= entry.offset) monotonic = false

// 记录最后一条消息的 offset
lastOffset = entry.offset
// 获取消息数据
val record = entry.record
// 如果待追加的消息长度大于允许的最大值(对应 max.message.bytes 配置),则抛出异常
val messageSize = entry.sizeInBytes
if (messageSize > config.maxMessageSize) {
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %s.".format(messageSize, config.maxMessageSize))
}

// CRC 校验
record.ensureValid()

// 记录当前消息集合中时间戳最大的消息,及其 offset
if (record.timestamp > maxTimestamp) {
maxTimestamp = record.timestamp
offsetOfMaxTimestamp = lastOffset
}

// 浅层消息数加 1
shallowMessageCount += 1
// 更新已验证的字节数
validBytesCount += messageSize

// 解析生产者使用的压缩方式
val messageCodec = CompressionCodec.getCompressionCodec(record.compressionType.id)
if (messageCodec != NoCompressionCodec) sourceCodec = messageCodec
}

// 解析服务端使用的压缩方式(对应 compression.type 配置)
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)

// 封装成 LogAppendInfo 对象返回
LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp,
Record.NO_TIMESTAMP, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
}

概括来说,上述方法主要做了以下 3 件事情:

  1. 对待追加消息集合中的每条消息执行 CRC 校验;
  2. 对待追加消息集合中的每条消息的长度进行校验,保证不超过允许的最大值(对应 max.message.bytes 配置);
  3. 计算待追加消息集合中的 firstOffset、lastOffset、消息的条数、有消息的字节数、offset 是否单调递增,以及获取生产者所指定的消息压缩方式。

步骤 2 会依据步骤 1 中对消息的校验结果,对未通过验证的消息字节部分进行截断,实现位于 Log#trimInvalidBytes 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private def trimInvalidBytes(records: MemoryRecords, info: LogAppendInfo): MemoryRecords = {
// 获取已验证的字节数
val validBytes = info.validBytes
if (validBytes < 0)
throw new CorruptRecordException(
"Illegal length of message set " + validBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
// 所有的字节都是已验证的,则直接返回
if (validBytes == records.sizeInBytes) {
records
}
// 存在未通过验证的字节,对这些异常字节进行截断
else {
val validByteBuffer = records.buffer.duplicate()
validByteBuffer.limit(validBytes)
MemoryRecords.readableRecords(validByteBuffer)
}
}

如果在调用 Log#append 方法时设置了参数 assignOffsets = true,则在追加消息数据之前会为消息重新分配 offset(对应 步骤 3 ),起始 offset 为当前 Log 对象中记录的下一条待追加消息的 offset 值。这一步主要做了以下几件事情:

  1. 更新待追加消息集合的 firstOffset 为当前 Log 对象中记录的下一条待追加消息对应的 offset 值;
  2. 对消息(包括压缩后的)的 magic 值进行统一,验证数据完整性,并分配 offset,同时按要求更新消息的时间戳;
  3. 更新待追加消息集合的 lastOffset 值;
  4. 如果配置了 message.timestamp.type=LogAppendTime,则设置日志追加时间戳;
  5. 对待追加消息集合中的消息进行逐条校验,避免存在过长的消息。

我们重点看一下第 2 步,这一步会执行 offset 分配操作,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private[kafka] def validateMessagesAndAssignOffsets(records: MemoryRecords, // 待追加的消息集合
offsetCounter: LongRef, // 消息对应的 offset 操作对象
now: Long, // 当前时间戳
sourceCodec: CompressionCodec, // 生产者指定的消息压缩方式
targetCodec: CompressionCodec, // 服务端指定的消息压缩方式
compactedTopic: Boolean = false, // 配置的消息清理策略:compact 或 delete
messageFormatVersion: Byte = Record.CURRENT_MAGIC_VALUE,
messageTimestampType: TimestampType,
messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
// 如果未对消息进行压缩处理
if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
// 存在消息的 magic 值与指定的 magic 值不一致
if (!records.hasMatchingShallowMagic(messageFormatVersion)) {
// 对消息的 magic 值进行统一,同时为消息分配 offset
convertAndAssignOffsetsNonCompressed(
records, offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs, messageFormatVersion)
} else {
// 所有消息的 magic 值均一致,则执行 offset 分配,以及验证操作
assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, messageTimestampType, messageTimestampDiffMaxMs)
}
}
// 如果对消息进行了压缩
else {
// 对消息进行解压缩,对深层消息进行 magic 值统一,并执行 offset 分配,以及验证操作
validateMessagesAndAssignOffsetsCompressed(
records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic, messageFormatVersion, messageTimestampType, messageTimestampDiffMaxMs)
}
}

由上面的实现可以看到,不管消息是否经过压缩,如果指定了需要为消息分配 offset,则需要处理所有的消息,包括经过压缩过的消息。方法 LogValidator#validateMessagesAndAssignOffsets 的主要工作也就是依据消息是否被压缩来分别调用对应的方法对待追加消息统一 magic 值,并执行 offset 分配、数据完整性校验,以及按需更新消息时间戳等操作,如果消息是经过压缩的,那么会对其进行解压缩。相关的方法实现比较冗长,这里不再继续深入。

如果指定不需要重新分配 offset 值,那么处理过程将会简单很多,仅仅需要验证消息已有的 offset 是否是单调递增的,并且待追加消息集合中消息的 firstOffset 不能小于 Log 对象中记录的下一条待追加消息的 offset 值,否则说明待追加的消息集合是非法的,这也是 步骤 4 的主要工作。

步骤 5 会校验处理后消息集合的长度,保证不超过单个 LogSegment 对象所允许的最大长度(对应 segment.bytes 配置)。

在完成了一系列准备工作之后,接下去可以将处理后的待追加消息数据写入 activeSegment 对象中。 步骤 6 调用了 Log#maybeRoll 方法尝试从 SkipList 中获取目标 activeSegment 对象,并在需要时创建新的 activeSegment 对象。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private def maybeRoll(messagesSize: Int, // 待追加的消息长度
maxTimestampInMessages: Long, // 消息中的最大时间戳
maxOffsetInMessages: Long // 消息的 lastOffset
): LogSegment = {
// 获取当前的 activeSegment 对象
val segment = activeSegment
val now = time.milliseconds
val reachedRollMs = segment.timeWaitedForRoll(now, maxTimestampInMessages) > config.segmentMs - segment.rollJitterMs
if (segment.size > config.segmentSize - messagesSize // 当前 activeSegment 在追加本次消息之后,长度超过 LogSegment 允许的最大值
|| (segment.size > 0 && reachedRollMs) // 当前 activeSegment 的存活时间超过了允许的最大时间
|| segment.index.isFull || segment.timeIndex.isFull // 索引文件满了
|| !segment.canConvertToRelativeOffset(maxOffsetInMessages)) { // 当前消息的 lastOffset 相对于 baseOffset 超过了 Integer.MAX_VALUE
// 创建新的 activeSegment
this.roll(maxOffsetInMessages - Integer.MAX_VALUE)
} else {
// 不需要创建新的 activeSegment,直接返回
segment
}
}

如果满足以下条件之一,则会创建一个新的 activeSegment 对象:

  1. 当前 activeSegment 对象在追加本次消息之后,长度超过 LogSegment 允许的最大值(对应 segment.bytes 配置)。
  2. 当前 activeSegment 对象的存活时间超过了允许的最大时间(对应 segment.ms 配置)。
  3. 对应的索引文件(index 和 timeindex)满了。

创建新 activeSegment 对象的过程位于 Log#roll 方法中,这里先不展开,后面会专门进行分析。

既然已经拿到了目标 activeSegment 对象,那么下一步( 步骤 7 )就是将待追加的消息数据写入 activeSegment 对象中(调用 LogSegment#append 方法,前面已经分析过)。写入成功之后需要更新 Log 对象本地记录的下一条待追加消息对应的 offset 值。

最后( 步骤 8 ),方法会检测当前时间距离上一次执行刷盘的时间是否超过配置的时间间隔(对应 flush.messages 配置),是则执行刷盘操作。相关实现位于 Log#flush 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def flush(): Unit = this.flush(this.logEndOffset)

def flush(offset: Long): Unit = {
// 如果 offset 小于等于 recoveryPoint,则直接返回,因为之前的已经全部落盘了
if (offset <= recoveryPoint)
return
debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " + time.milliseconds + " unflushed = " + unflushedMessages)
// 获取 [recoveryPoint, offset) 之间的 LogSegment 对象
for (segment <- this.logSegments(recoveryPoint, offset))
segment.flush() // 执行刷盘操作,包括 log、index 和 timeindex 文件
lock synchronized {
// 如果当前已经刷盘的 offset 大于之前记录的 recoveryPoint,则更新 recoveryPoint
if (offset > recoveryPoint) {
// 更新 recoveryPoint 值
this.recoveryPoint = offset
// 更新最近一次执行 flush 的时间
lastflushedTime.set(time.milliseconds)
}
}
}

执行刷盘操作之前会先将当前 offset 与 recoveryPoint 变量进行比较,这里的 offset 对应当前 Log 对象中记录的下一条待追加消息的 offset,而 recoveryPoint 变量在当前 Log 对象创建时指定,并在运行过程中更新,用于表示当前已经刷盘的日志数据对应的最大 offset 值。如果当前 offset 小于等于 recoveryPoint,则无需执行刷盘操作,因为 recoveryPoint 之前的数据已经全部落盘了。否则会调用 Log#logSegments 方法从当前 Log 对象的 SkipList 中获取位于 [recoveryPoint, offset) 区间的 LogSegment 对象集合,并应用 LogSegment#flush 方法对 LogSegment 相关的文件执行刷盘操作,包括 log、index 和 timeindex 文件。同时会更新 recoveryPoint 和 lastflushedTime 字段,后者用于记录最近一次执行刷盘操作的时间戳。

创建 Active Segment 对象

既然上一小节提到了 Log#roll 方法,那么本小节就来分析一下该方法的实现,该方法用于创建一个新的 activeSegment 对象,并将上任的 activeSegment 对象中的数据落盘。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def roll(expectedNextOffset: Long = 0): LogSegment = {
val start = time.nanoseconds
lock synchronized {
// 获取 LEO 值
val newOffset = Math.max(expectedNextOffset, logEndOffset)
val logFile = Log.logFile(dir, newOffset) // 对应的 log 文件
val indexFile = indexFilename(dir, newOffset) // 对应的 index 文件
val timeIndexFile = timeIndexFilename(dir, newOffset) // 对应的 timeindex 文件
// 遍历检查,如果文件存在则删除
for (file <- List(logFile, indexFile, timeIndexFile); if file.exists) {
warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
file.delete()
}

// 处理之前的 activeSegment 对象
segments.lastEntry() match {
case null =>
case entry =>
val seg: LogSegment = entry.getValue
// 追加最大时间戳与对应的 offset 到 timeindex 文件
seg.onBecomeInactiveSegment()
// 对 log、index 和 timeindex 文件进行截断处理,仅保留有效字节
seg.index.trimToValidSize()
seg.timeIndex.trimToValidSize()
seg.log.trim()
}

// 创建新的 activeSegment 对象
val segment = new LogSegment(
dir,
startOffset = newOffset,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time,
fileAlreadyExists = false,
initFileSize = initFileSize(),
preallocate = config.preallocate)

// 添加新的 activeSegment 到 segments 跳跃表中
val prev = this.addSegment(segment)
// 如果对应位置已经存在 LogSegment,则抛出异常
if (prev != null)
throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))

// 因为有新的 activeSegment 对象创建,所以更新 Log 中记录的 activeSegment 的 baseOffset 值,及其物理地址
this.updateLogEndOffset(nextOffsetMetadata.messageOffset)

// 执行 flush 操作,将上任 activeSegment 的数据落盘
scheduler.schedule("flush-log", () => this.flush(newOffset))

info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0 * 1000.0)))

// 返回新的 activeSegment 对象
segment
}
}

创建一个新的 activeSegment 对象的过程比较直观,无非是创建一个新的 activeSegment 对象,并将其添加到 SkipList 中,同时需要更新 Log 对象本地记录的 activeSegment 对象的 baseOffset 及其物理地址。此外,我们需要将上一任 activeSegment 对象中的数据落盘,Kafka 为此注册了一个名为 flush-log 的定时任务异步处理该过程,需要注意的是这里的 flush-log 任务仅运行一次。这里的刷盘操作是将 recoveryPoint 到新 activeSegment 对象 baseOffset (不包括)之间的数据落盘,具体的落盘操作交由 Log#flush 方法执行,我们在前面已经分析过该方法,这里不再重复撰述。

读取日志数据

下面接着来看一下从 Log 对象中读取日志数据的过程,位于 Log#read 方法中。不同于追加消息时只能操作 activeSegment 对象,读取消息可以从 SkipList 中任意一个 LogSegment 对象中进行读取。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def read(startOffset: Long, // 读取消息的起始 offset
maxLength: Int, // 读取消息的最大字节数
maxOffset: Option[Long] = None, // 读取消息的结束 offset
minOneMessage: Boolean = false): FetchDataInfo = {

trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))

// 将 nextOffsetMetadata 保存成局部变量,避免加锁带来的竞态条件
val currentNextOffsetMetadata = nextOffsetMetadata
// 获取 Log 本地记录的下一条待追加消息消息对应的 offset 值
val next = currentNextOffsetMetadata.messageOffset
// 边界检查
if (startOffset == next)
return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY)

// 查找 baseOffset 小于等于 startOffset 且最大的 LogSegment 对象
var entry = segments.floorEntry(startOffset)

// 边界检查,Log 对象中记录的最后一条消息的真实 offset 应该是 next-1,next 指的是下一条追加消息的 offset
if (startOffset > next || entry == null)
throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %s to %d.".format(startOffset, segments.firstKey, next))

while (entry != null) {
// 获取待读取的最大物理地址
val maxPosition = {
// 如果当前读取的是 activeSegment 对象
if (entry == segments.lastEntry) {
// 从 nextOffsetMetadata 对象中获取 activeSegment 对应的最大物理地址
val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
// 如果期间正好创建了一个新的 activeSegment 对象,那么这里拿到的应该是上一任 activeSegment 对象,
// 它已经不再活跃了,可以直接读取到结尾
if (entry != segments.lastEntry)
entry.getValue.size
// 否则,直接返回 exposedPos,如果这里读取到 LogSegment 结尾的话,可能会出现 OffsetOutOfRangeException 异常
else
exposedPos
}
// 如果当前读取的不是 activeSegment 对象,则直接读取到对应 LogSegment 的结尾
else {
entry.getValue.size
}
}

// 调用 LogSegment#read 方法读取消息
val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage)
if (fetchInfo == null) {
// 如果没有读取到消息,则尝试读取下一个 LogSegment 对象
entry = segments.higherEntry(entry.getKey)
} else {
return fetchInfo
}
}

// 未读取到 startOffset 之后的消息
FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
}

读取日志数据的执行过程如代码注释,比较直观,在做好边界检查的前提下寻找小于 startOffset 的最大 baseOffset,并以此 offset 开始从 SkipList 中定位 LogSegment 对象,如果该 LogSegment 对象为空,则会继续读取下一个 LogSegment 对象。读取的过程区分是不是 activeSegment 对象,如果当前读取的 LogSegment 不是 activeSegment 对象,那么对应的 LogSegment 已经是“冷却”状态,所以我们可以直接将其中的数据全部读取出来返回,如果当前读取的是 activeSegment 对象,则需要以 Log 对象中记录的 activeSegment 对象的最大物理地址作为读取的上界,如果直接读取到 activeSegment 对象结尾可能导致 OffsetOutOfRangeException 异常。考虑下面这样一个场景(假设有读线程 A 和写线程 B):

  1. A 线程请求读取 startOffset 为 101 之后的数据,刚好该请求落在了 activeSegment 对象上;
  2. B 线程调用 append 方法追加了 offset 为 [105, 109] 的消息集合,但是还未更新 Log 对象本地记录的下一条消息对应的 offset 值(此时仍为 105);
  3. A 线程读取到了 [101, 109] 之间的数据,并且继续请求 startOffset 为 110 之后的数据,但是因为 startOffset > next 而抛出 OffsetOutOfRangeException 异常。

所以对于 activeSegment 对象而言,我们应该以 Log 对象中记录的 activeSegment 对应的最大物理地址作为上界。另外一个需要考虑的问题是在读取 activeSegment 对象过程中,因为追加消息而产生了新的 activeSegment 对象的情况,那么此时 Log#read 方法持有的 activeSegment 对象就变成前任了,也就不会再有写操作同时发生的问题,所以可以直接读取到该 activeSegment 对象的结尾位置。

删除日志数据

本章节的最后,一起来看一下 Log#delete 方法,该方法会删除当前 Log 对象对应 log 目录,以及目录下的所有文件,并清空 SkipList 对象。方法实现如下:

1
2
3
4
5
6
7
8
9
10
private[log] def delete() {
lock synchronized {
// 遍历 SkipList 中每个 LogSegment 对应的 log、index 和 timeindex 文件
logSegments.foreach(_.delete())
// 清空 SkipList 对象
segments.clear()
// 删除 log 目录及其目录下的所有文件和目录
Utils.delete(dir)
}
}

具体逻辑如代码注释,比较简单。

LogManager 组件

LogManager 是 Kafka 日志数据操作的入口,基于上一节分析的 Log 类对象提供了对日志数据的加载、创建、删除,以及查询等功能。我们在配置 Kafka 服务时,可以通过 log.dirs 配置项为一个 broker 节点指定多个 log 目录,这些目录均由 LogManager 负责管理。LogManager 在启动时会校验 log.dirs 配置,确保指定的 log 目录没有重复的配置且都是可读的,同时对于不存在的目录会执行创建。每个 log 目录下包含多个 topic 分区目录,每个 topic 分区目录由一个 Log 类对象对其进行管理,LogManager 会记录每个 topic 分区对象及其对应的 Log 类对象之间的映射关系。LogManager 类的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class LogManager(val logDirs: Array[File], // log 目录集合,对应 log.dirs 配置,一般选择 log 数目最少的目录进行创建
val topicConfigs: Map[String, LogConfig], // topic 相关配置
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig, // log cleaner 相关配置
ioThreads: Int, // 每个 log 目录下分配的执行加载任务的线程数目
val flushCheckMs: Long,
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler, // 定时任务调度器
val brokerState: BrokerState, // 当前 broker 节点的状态
time: Time) extends Logging {

/**
* 每个 log 目录下面都有一个 recovery-point-offset-checkpoint 文件,
* 记录了当前 log 目录每个 Log 的 recoveryPoint 信息,用于在 broker 启动时恢复日志数据
*/
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
/** 创建或删除 Log 时的锁对象 */
private val logCreationOrDeletionLock = new Object
/** 记录每个 topic 分区对象与 Log 对象之间的映射关系 */
private val logs = new Pool[TopicPartition, Log]()
/** 记录需要被删除的 Log 对象 */
private val logsToBeDeleted = new LinkedBlockingQueue[Log]()
/** 尝试对每个 log 目录在文件系统层面加锁,这里加的是进程锁 */
private val dirLocks = this.lockLogDirs(logDirs)
/**
* 遍历为每个 log 目录创建一个操作其名下 recovery-point-offset-checkpoint 文件的 OffsetCheckpoint 对象,
* 并建立映射关系
*/
private val recoveryPointCheckpoints = logDirs.map(
// recovery-point-offset-checkpoint 文件
dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
/** 用于清理过期或者过大的日志 */
val cleaner: LogCleaner = if (cleanerConfig.enableCleaner) new LogCleaner(cleanerConfig, logDirs, logs, time = time) else null

// ... 省略方法定义

}

LogManager 在实例化过程中会执行以下操作:

  1. 遍历处理配置的 log 路径(对应 log.dirs 配置),如果对应的路径不存在则创建,同时校验路径是否存在重复、是否是目录,以及是否可读;
  2. 遍历配置的 log 目录,尝试对每个目录在文件系统层面加锁,这里加的是进程锁;
  3. 遍历配置的 log 目录,为每个目录下的 recovery-point-offset-checkpoint 文件创建对应的 OffsetCheckpoint 对象,用于管理每个 topic 分区对应的 HW offset 信息;
  4. 遍历配置的 log 目录,将每个 topic 分区对应的日志数据封装成 Log 对象,同时记录需要被删除的 topic 分区目录,等待后续删除。

文件 recovery-point-offset-checkpoint 用于记录每个 topic 分区对应的 HW offset 信息,当 broker 节点重启时辅助恢复每个 topic 分区的日志数据。一个简单的文件示例如下:

1
2
3
4
5
6
7
8
9
10
0
8
topic-default 3 2271154
topic-default 2 2271351
topic-default 4 2271051
topic-default 0 2270751
topic-default 5 2271558
topic-default 1 2272018
topic-default 7 2271197
topic-default 6 2270673

其中第一行是版本号,第二行是记录条数,从第三行开始每一行都记录着“topic partition HW”信息。OffsetCheckpoint 类定义了 OffsetCheckpoint#writeOffsetCheckpoint#read 两个方法,用于对 recovery-point-offset-checkpoint 执行读写操作。

步骤 4 会执行加载每个 log 目录下的日志文件,并为每个 topic 分区对应的日志目录创建一个 Log 对象,对于标记为需要删除的 topic 分区目录(对应“-delete”后缀的目录),则将其 Log 对象添加到 LogManager#logsToBeDeleted 字段中,等待后面的周期性任务(kafka-delete-logs)对其进行删除。相关实现位于 LogManager#loadLogs 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
private def loadLogs(): Unit = {
info("Loading logs.")
val startMs = time.milliseconds
// 用于记录所有 log 目录对应的线程池
val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
val jobs = mutable.Map.empty[File, Seq[Future[_]]]

// 遍历处理每个 log 目录
for (dir <- this.logDirs) {
// 为每个 log 目录创建一个 ioThreads 大小的线程池
val pool = Executors.newFixedThreadPool(ioThreads)
threadPools.append(pool)

// 尝试获取 .kafka_cleanshutdown 文件,如果该文件存在则说明 broker 节点是正常关闭的
val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
if (cleanShutdownFile.exists) {
debug("Found clean shutdown file. Skipping recovery for all logs in data directory: " + dir.getAbsolutePath)
} else {
// 当前 broker 不是正常关闭,设置 broker 状态为 RecoveringFromUncleanShutdown,表示正在从上次异常关闭中恢复
brokerState.newState(RecoveringFromUncleanShutdown)
}

// 读取每个 log 目录下的 recovery-point-offset-checkpoint 文件,返回 topic 分区对象与 HW 之间的映射关系
var recoveryPoints = Map[TopicPartition, Long]()
try {
recoveryPoints = this.recoveryPointCheckpoints(dir).read()
} catch {
case e: Exception =>
warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e)
warn("Resetting the recovery checkpoint to 0")
}

// 遍历当前 log 目录的子目录,仅处理目录,忽略文件
val jobsForDir = for {
dirContent <- Option(dir.listFiles).toList
logDir <- dirContent if logDir.isDirectory
} yield {
// 为每个 Log 目录创建一个 Runnable 任务
CoreUtils.runnable {
debug("Loading log '" + logDir.getName + "'")
// 依据目录名解析得到对应的 topic 分区对象
val topicPartition = Log.parseTopicPartitionName(logDir)
// 获取当前 topic 分区对应的配置
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
// 获取 topic 分区对应的 HW 值
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)

// 创建对应的 Log 对象,每个 topic 分区目录对应一个 Log 对象
val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
// 如果当前 log 是需要被删除的文件,则记录到 logsToBeDeleted 队列中,会有周期性任务对其执行删除操作
if (logDir.getName.endsWith(Log.DeleteDirSuffix)) { // -delete
logsToBeDeleted.add(current)
} else {
// 建立 topic 分区对象与其 Log 对象之间的映射关系,不允许一个 topic 分区对象对应多个目录
val previous = logs.put(topicPartition, current)
if (previous != null) {
throw new IllegalArgumentException(
"Duplicate log directories found: %s, %s!".format(current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
}
}
}
}

// 提交上面创建的任务,并将提交结果封装到 jobs 集合中,jobsForDir 是 List[Runnable] 类型
jobs(cleanShutdownFile) = jobsForDir.map(pool.submit)
}

// 阻塞等待上面提交的任务执行完成,即等待所有 log 目录下 topic 分区对应的目录文件加载完成
try {
for ((cleanShutdownFile, dirJobs) <- jobs) {
dirJobs.foreach(_.get)
// 删除对应的 .kafka_cleanshutdown 文件
cleanShutdownFile.delete()
}
} catch {
case e: ExecutionException =>
error("There was an error in one of the threads during logs loading: " + e.getCause)
throw e.getCause
} finally {
// 遍历关闭线程池
threadPools.foreach(_.shutdown())
}

info(s"Logs loading complete in ${time.milliseconds - startMs} ms.")
}

LogManager 在实例化时会为每个 log 目录创建一个指定大小的线程池,然后对目录下的子目录(不包括文件)进行并发加载,最终将每个 topic 分区目录下的日志相关数据封装成 Log 对象,并记录到 LogManager#logs 字段中,这是一个 Pool[K, V] 类型的字段,基于 ConcurrentHashMap 实现,其中这里的 key 为 Log 对象所属的 topic 分区对象。

在 LogManager 启动时(对应 LogManager#startup 方法)会注册一个名为 kafka-delete-logs 的周期性任务,该任务会周期性调用 LogManager#deleteLogs 方法对标记为“-delete”的目录执行删除操作。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private def deleteLogs(): Unit = {
try {
var failed = 0
// 如果存在需要删除的目录
while (!logsToBeDeleted.isEmpty && failed < logsToBeDeleted.size()) {
// 获取需要删除的目录对应的 Log 对象
val removedLog = logsToBeDeleted.take()
if (removedLog != null) {
try {
// 调用 Log.delete 方法执行删除操作
removedLog.delete()
info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
} catch {
case e: Throwable =>
error(s"Exception in deleting $removedLog. Moving it to the end of the queue.", e)
failed = failed + 1
// 如果删除异常,则归还,下一次周期性调用时再删除
logsToBeDeleted.put(removedLog)
}
}
}
} catch {
case e: Throwable =>
error(s"Exception in kafka-delete-logs thread.", e)
}
}

方法 LogManager#deleteLogs 会遍历 LogManager#logsToBeDeleted 队列,并对其中的 Log 对象调用 Log#delete 方法执行删除,如果删除异常则会归还到队列,并在下一次周期性调用时再尝试执行删除。方法 Log#delete 已经在前面分析过,这里不再重复撰述。

周期性定时任务

前面分析了启动过程中激活的 kafka-delete-logs 周期性任务,下面继续来看一下 LogManager#startup 方法的剩余实现,该方法主要的逻辑就是启动 4 个周期性任务。在 Kafka 服务启动时会创建 LogManager 实例,并调用 LogManager#startup 方法,该方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def startup() {
if (scheduler != null) {
// 1. 启动 kafka-log-retention 周期性任务,对过期或过大的日志文件执行清理工作
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
this.cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)

// 2. 启动 kafka-log-flusher 周期性任务,对日志文件执行刷盘操作
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
this.flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)

// 3. 启动 kafka-recovery-point-checkpoint 周期性任务,更新 recovery-point-offset-checkpoint 文件
scheduler.schedule("kafka-recovery-point-checkpoint",
this.checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)

// 4. 启动 kafka-delete-logs 周期性任务,删除标记为需要被删除的 log 目录
scheduler.schedule("kafka-delete-logs",
this.deleteLogs,
delay = InitialTaskDelayMs,
period = defaultConfig.fileDeleteDelayMs,
TimeUnit.MILLISECONDS)
}

// 启动 LogCleaner 线程
if (cleanerConfig.enableCleaner) cleaner.startup()
}

LogManager 在启动过程中启动了 4 个周期性任务和 1 个 LogCleaner 线程,这 4 个周期性任务包括:

  1. kafka-log-retention :定期对过期或过大的日志文件执行清理操作。
  2. kafka-log-flusher :定期对日志文件执行刷盘操作。
  3. kafka-recovery-point-checkpoint :定期更新 recovery-point-offset-checkpoint 文件。
  4. kafka-delete-logs :定期删除标记为需要被删除的 log 目录。

其中任务 4 我们已经在前面分析过,下面逐个来看一下前 3 个任务。 任务 1 的实现位于 LogManager#cleanupLogs 方法中,该方法会遍历所有的 Log 对象,并从两个维度对执行清理工作:

  1. 时间维度:即保证 Log 对象中所有的 LogSegment 都是有效的,对于过期的 LogSegment 执行删除操作。
  2. 空间维度:既保证 Log 对象不应过大,对于超出的部分会执行删除操作。

实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
def cleanupLogs() {
debug("Beginning log cleanup...")
var total = 0
val startMs = time.milliseconds
// 遍历处理每个 topic 分区对应的 Log 对象,只有对应 Log 配置了 cleanup.policy=delete 才会执行删除
for (log <- allLogs(); if !log.config.compact) {
debug("Garbage collecting '" + log.name + "'")
// 遍历删除当前 Log 对象中过期的 LogSegment 对象,并保证 Log 的大小在允许范围内(对应 retention.bytes 配置)
total += log.deleteOldSegments()
}
debug("Log cleanup completed. " + total + " files deleted in " + (time.milliseconds - startMs) / 1000 + " seconds")
}

清理操作仅处理配置了 cleanup.policy=delete 的 Log 对象,并调用 Log#deleteOldSegments 方法执行判定和删除操作。方法 Log#deleteOldSegments 中通过调用 Log#deleteRetentionMsBreachedSegments 对过期的 LogSegment 对象执行删除操作,并调用 Log#deleteRetentionSizeBreachedSegments 方法对当前 Log 对象的大小进行判定,如果超过设定大小,则会从 Log 对象中删除部分 LogSegment 对象,以保证最终的 Log 大小在允许范围内。这两个方法最终都是调用 Log#deleteOldSegments 方法执行具体的删除操作,该方法接收一个 LogSegment => Boolean 类的函数,如果某个 LogSegment 对象满足给定的谓语,则会应用 Log#deleteSegment 方法对该 LogSegment 执行删除操作。

其中 Log#deleteRetentionMsBreachedSegments 方法给定的判定条件很简单(如下),比较当前 LogSegment 对象最大消息时间戳距离当前时间是否超过 retention.ms 毫秒,如果超过则认为该 LogSegment 已过期。

1
2
3
4
5
6
private def deleteRetentionMsBreachedSegments(): Int = {
if (config.retentionMs < 0) return 0
val startMs = time.milliseconds
// 如果 LogSegment 中最大时间戳距离当前已经超过配置时间,则删除
this.deleteOldSegments(startMs - _.largestTimestamp > config.retentionMs)
}

Log#deleteRetentionSizeBreachedSegments 方法则会首先计算出当前 Log 超出设定值(对应 retention.bytes 配置)的字节数,然后对 Log 中的 LogSegment 对象遍历删除,直到 Log 的大小不再超出为止。实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private def deleteRetentionSizeBreachedSegments(): Int = {
if (config.retentionSize < 0 || size < config.retentionSize) return 0
// Log 的总大小减去允许的大小
var diff = size - config.retentionSize

def shouldDelete(segment: LogSegment): Boolean = {
// 大于等于 0 则说明仍然过大
if (diff - segment.size >= 0) {
diff -= segment.size
true
} else {
false
}
}

// 删除 Log 中超出大小的部分
this.deleteOldSegments(shouldDelete)
}

接下来继续看一下公共逻辑 Log#deleteOldSegments 方法(实现如下),该方法会基于给定的谓语 predicate 从 Log 中选择需要被删除的 LogSegment 对象,并对每个需要被删除的 LogSegment 对象应用 Log#deleteSegment 方法进行删除,包括从 Log 对象中移除该 LogSegment 对象,以及删除 LogSegment 对应的 log、index 和 timeindex 文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
lock synchronized {
// 检查当前 Log 中的 LogSegment 是否满足删除条件,并返回需要被删除的 LogSegment 对象集合
val deletable = this.deletableSegments(predicate)
val numToDelete = deletable.size
if (numToDelete > 0) {
// 如果当前 Log 中所有的 LogSegment 对象都需要被删除,则在删除之前创建一个新的 activeSegment 对象,保证 Log 可以正常运行
if (segments.size == numToDelete) this.roll()
// 遍历删除需要删除的 LogSegment 对象及其相关数据文件
deletable.foreach(deleteSegment)
}
// 返回被删除的 LogSegment 数目
numToDelete
}
}

如果本次删除操作需要删除 Log 中全部的 LogSegment 对象,则会调用 Log#roll 方法为当前 Log 对象的 SkipList 创建一个新的 activeSegment 对象,以保证 Log 的正常运行,该方法的实现在前面已经分析过,不再重复撰述。

再来看一下周期性 任务 2 ,该任务用于定期对日志文件执行刷盘(flush)操作。相关逻辑实现位于 LogManager#flushDirtyLogs 方法中,该方法会遍历处理每个 topic 分区对应的 Log 对象,通过记录在 Log 对象中的上次执行 flush 的时间戳与当前时间对比,如果时间差值超过一定的阈值(对应 flush.ms 配置),则调用 Log#flush 方法执行刷盘操作,该方法的实现同样在前面已经分析过,不再重复撰述。

接着来看一下周期性 任务 3 ,该任务用于定期更新每个 log 目录名下的 recovery-point-offset-checkpoint 文件。相关实现位于 LogManager#checkpointRecoveryPointOffsets 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
def checkpointRecoveryPointOffsets() {
// 为每个 log 目录应用 checkpointLogsInDir 方法
logDirs.foreach(checkpointLogsInDir)
}

private def checkpointLogsInDir(dir: File): Unit = {
// 获取指定 log 目录对应的 Map[TopicPartition, Log] 集合
val recoveryPoints = logsByDir.get(dir.toString)
if (recoveryPoints.isDefined) {
// 更新对应的 recovery-point-offset-checkpoint 文件
this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
}
}

方法会获取位于指定 log 目录下所有 topic 分区对应的 recoveryPoint 值(即当前已经落盘的日志的最大 offset),并全量更新 log 目录下的 recovery-point-offset-checkpoint 文件。

重复日志数据清理

本小节来看一下 LogCleaner 线程,如果在配置中指定了 log.cleaner.enable=true,那么在 LogManager#startup 方法的最后会调用 LogCleaner#startup 方法启动 LogCleaner 线程对日志数据执行清理工作。前面我们在分析周期性任务 kafka-log-retention 时,已经知道该周期性任务会对日志中过大或过期的 LogSegment 对象执行清理操作,那么 LogCleaner 又是对什么执行清理呢?

我们知道 Kafka 对于生产者发来的消息都是顺序追加到日志文件中的,而 Kafka 又采用本地文件系统对日志文件进行存储,所以随着时间的流逝日志文件会越来越大,其中存储的相当一部分消息数据都具备相同的 key。如果配置了 cleanup.policy=compact 策略,那么 Kafka 的 LogCleaner 线程就会对具备相同 key 的消息进行清理操作,仅保留当前具备最大 offset 的 key 的消息。

LogCleaner 在执行清理操作时会将一个 log 分割成 clean 和 dirty 两部分。其中 clean 是上次完成清理的部分,Kafka 会在对应 log 目录下生成一个 cleaner-offset-checkpoint 文件,用于记录每个 topic 分区上一次执行清理操作的 offset 值,而 dirty 部分则是本次清理操作的目标区域,但是 dirty 中并不是所有的 LogSegment 对象都会执行清理操作,Kafka 又将这一部分分为了 cleanable 和 uncleanable 两块,能够被分为 uncleanable 的 LogSegment 对象包含两类:

  1. 当前 Log 对象中的 activeSegment 对象。
  2. LogSegment 对象中的最大消息时间戳距离当前时间位于配置的滞后压缩时间(对应 min.compaction.lag.ms 配置)范围内。

其中不清理 activeSegment 对象,主要是为了防止竞态条件,因为 activeSegment 是可以写入的对象,这样会让清理操作变得复杂,且收益不大。

下面我们从 LogCleaner#startup 方法开始,整个清理工作主要涉及 LogCleaner、LogCleanerManager、CleanerThread,以及 Cleaner 这 4 个类。方法 LogCleaner#startup 的主要作用就是启动注册在 LogCleaner 中的 CleanerThread 线程集合。CleanerThread 继承自 ShutdownableThread 抽象类,所以 CleanerThread#doWork 方法是其处理入口,该方法只是简单调用了 CleanerThread#cleanOrSleep 方法,后者会选取一个最需要被清理的 LogSegment 区间,并执行清理工作。相关实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private def cleanOrSleep() {
// 选取下一个最需要进行日志清理的 LogToClean 对象
val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
// 没有需要被清理的 LogToClean 对象,休息一会后继续尝试
case None =>
false
// 执行消息清理操作
case Some(cleanable) =>
var endOffset = cleanable.firstDirtyOffset
try {
// 调用 Cleaner#clean 方法执行清理工作
val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable)
recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats)
endOffset = nextDirtyOffset
} catch {
case _: LogCleaningAbortedException => // task can be aborted, let it go.
} finally {
// 对 Log 的清理状态进行转换,如果当前 topic 分区的清理状态是 LogCleaningInProgress,则更新 cleaner-offset-checkpoint 文件
cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
}
true
}

// 获取所有启用了 compact 和 delete 清理策略的 Log 对象,并将其对应的 topic 分区状态设置为 LogCleaningInProgress
val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
deletable.foreach {
case (topicPartition, log) =>
try {
// 对设置了清理策略为 delete 的 LogSegment 执行删除操作,删除过期或过大的 LogSegment 对象。
log.deleteOldSegments()
} finally {
// 移除这些 topic 分区对应的 LogCleaningInProgress 状态
cleanerManager.doneDeleting(topicPartition)
}
}

// 如果没有需要执行清理的 LogToClean 对象,则休息一会后继续重试
if (!cleaned) backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS)
}

清理操作的执行流程如下:

  1. 选取一个最需要进行日志清理的 LogToClean 对象,如果存在则执行清理操作;
  2. 如果配置了 cleanup.policy=delete 策略,则对 Log 对象中过大或过期的 LogSegment 对象执行删除操作;
  3. 如果没有需要进行清理的 LogToClean 对象,则休息一会儿后重试。

其中第 2 步与前面介绍的周期性任务 kafka-log-retention 类似,这里我们重点来看一下第 1 步,这一步的核心操作是调用 LogCleanerManager#grabFilthiestCompactedLog 方法选取下一个最需要被清理的 LogToClean 对象,然后调用 Cleaner#clean 依据该对象执行清理操作。

方法 LogCleanerManager#grabFilthiestCompactedLog 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def grabFilthiestCompactedLog(time: Time): Option[LogToClean] = {
inLock(lock) {
val now = time.milliseconds
this.timeOfLastRun = now

// 读取 log 目录下的 cleaner-offset-checkpoint 文件,获取每个 topic 分区上次清理操作的 offset 边界
val lastClean = allCleanerCheckpoints
val dirtyLogs = logs.filter {
// 过滤掉 cleanup.policy 配置为 delete 的 Log 对象,因为不需要压缩
case (_, log) => log.config.compact // match logs that are marked as compacted
}.filterNot {
// 过滤掉所有正在执行清理工作的 Log 对象
case (topicPartition, _) => inProgress.contains(topicPartition) // skip any logs already in-progress
}.map {
// 将需要被清理的区间封装成 LogToClean 对象
case (topicPartition, log) => // create a LogToClean instance for each
// 计算需要执行清理操作的 offset 区间
val (firstDirtyOffset, firstUncleanableDirtyOffset) =
LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, now)
// 构建清理区间对应的 LogToClean 对象
LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset)
}.filter(ltc => ltc.totalBytes > 0) // 忽略待清理区间数据为空的 LogToClean 对象

// 获取待清理区间最大的 cleanableRatio 比率
this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
// 过滤掉所有 cleanableRatio 小于等于配置值(对应 min.cleanable.dirty.ratio 配置)的 LogToClean 对象
val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
if (cleanableLogs.isEmpty) {
None
} else {
// 基于需要清理的数据占比选择最需要执行清理的 LogToClean 对象
val filthiest = cleanableLogs.max
// 更新对应 topic 分区的清理状态为 LogCleaningInProgress
inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
Some(filthiest)
}
}
}

上述方法的执行流程如下:

  1. 读取每个 log 目录下 cleaner-offset-checkpoint 文件,解析每个 topic 分区上次执行清理操作对应的 offset 值;
  2. 遍历处理 Log 对象集合,筛选符合要求的 Log 对象(Log 对象配置的清理策略 cleanup.policy=compact,且该 Log 对象当前没有正在执行清理操作),并与其需要被清理的区间一起封装成对应的 LogToClean 对象;
  3. 过滤掉不包含数据,以及待清理数据占比不超过指定阈值(对应 min.cleanable.dirty.ratio 配置)的 LogToClean 对象,并从剩下的 LogToClean 集合中选择待清理数据占比最高的 LogToClean 对象。

计算待清理区间的过程由 LogCleanerManager#cleanableOffsets 方法实现,区间值包括 dirty 部分的起始 offset 值和 uncleanable LogSegment 对象的 baseOffset 值。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def cleanableOffsets(log: Log, // 待清理的 Log 对象
topicPartition: TopicPartition, // 对应的 topic 分区对象
lastClean: immutable.Map[TopicPartition, Long], // 记录每个 topic 分区上一次清理操作的结束 offset
now: Long): (Long, Long) = {

// 获取当前 topic 分区上次清理的 offset,即下一次需要被清理的 Log 的起始 offset
val lastCleanOffset: Option[Long] = lastClean.get(topicPartition)

// 获取当前 Log 对象 SkipList 中首个 LogSegment 对应的 baseOffset
val logStartOffset = log.logSegments.head.baseOffset
// 计算下一次执行清理操作的起始 offset
val firstDirtyOffset = {
// 如果 cleaner-offset-checkpoint 中没有当前 topic 分区的相关记录或记录的 offset 小于 logStartOffset,
// 则以当前 Log 对象 SkipList 中的起始 logStartOffset 作为下一次需要被清理的起始 offset 位置
val offset = lastCleanOffset.getOrElse(logStartOffset)
if (offset < logStartOffset) {
// don't bother with the warning if compact and delete are enabled.
if (!isCompactAndDelete(log))
warn(s"Resetting first dirty offset to log start offset $logStartOffset since the checkpointed offset $offset is invalid.")
logStartOffset
} else {
offset
}
}

// 获取需要被清理的 LogSegment 对象,即在 firstDirtyOffset 到 activeSegment 之间的 LogSegment 对象集合
val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset)
// 获取配置的清理滞后时间(对应 min.compaction.lag.ms 配置)
val compactionLagMs = math.max(log.config.compactionLagMs, 0L)

// 计算本次不应该被清理的 LogSegment 对应的最小 offset 值
val firstUncleanableDirtyOffset: Long = Seq(
// activeSegment 不能执行清理操作,避免竞态条件
Option(log.activeSegment.baseOffset),

// 寻找最大消息时间戳距离当前时间戳在清理滞后时间(compactionLagMs)范围内的 LogSegment 对应的最小 offset 值
if (compactionLagMs > 0) {
dirtyNonActiveSegments.find { s =>
// 如果 LogSegment 的最大消息时间戳距离当前在 compactionLagMs 范围内,则不能执行清理操作
val isUncleanable = s.largestTimestamp > now - compactionLagMs
debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - compactionLagMs}; is uncleanable=$isUncleanable")
isUncleanable
} map (_.baseOffset)
} else None
).flatten.min

debug(s"Finding range of cleanable offsets for log=${log.name} topicPartition=$topicPartition. Last clean offset=$lastCleanOffset now=$now => firstDirtyOffset=$firstDirtyOffset firstUncleanableOffset=$firstUncleanableDirtyOffset activeSegment.baseOffset=${log.activeSegment.baseOffset}")

(firstDirtyOffset, firstUncleanableDirtyOffset)
}

清理区间的起始 offset,即 firstDirtyOffset,一般都对应着 cleaner-offset-checkpoint 文件中记录的上次执行清理操作的结束 offset,但是考虑到当前 topic 分区可能是第一次执行清理操作,或者 offset 对应的 LogSegment 可能已经被删除,所以需要将其与当前 Log 对象的首个 LogSegment 的 baseOffset 进行对比,选择较大值。

清理区间的结束 offset,即 firstUncleanableDirtyOffset,也就是 uncleanable 区间的起始 offset,我们在前面介绍了 uncleanable 区间包含 2 类 LogSegment 对象,即 activeSegment 对象和最大消息时间戳距离当前时间位于配置的滞后压缩时间范围内的 LogSegment 对象,在计算 firstUncleanableDirtyOffset 时,也就是从这两类 LogSegment 中寻找最小的 baseOffset 作为清理区间的结束 offset 值。

下面来看一下 Cleaner#clean 方法,清理操作的具体执行过程正位于此,方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {
// 记录消息清理的状态信息
val stats = new CleanerStats()

info("Beginning cleaning of log %s.".format(cleanable.log.name))
val log = cleanable.log // 需要被清理的 Log 对象

info("Building offset map for %s...".format(cleanable.log.name))
// 清理操作的 offset 上界
val upperBoundOffset = cleanable.firstUncleanableOffset

// 1. 遍历处理待清理区间的 LogSegment 对象,填充 offsetMap 对象,主要记录每个消息 key 及其对应清理区间内的最大 offset 值
this.buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats)
val endOffset = offsetMap.latestOffset + 1
stats.indexDone()

// 2. 计算删除标识
val deleteHorizonMs = log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
case None => 0L
case Some(seg) => seg.lastModified - log.config.deleteRetentionMs // delete.retention.ms
}

// determine the timestamp up to which the log will be cleaned,this is the lower of the last active segment and the compaction lag
val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)

// 3. 对 [0, endOffset) 区间的 LogSegment 进行分组,并以组为单位执行清理操作
info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
for (group <- this.groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize, cleanable.firstUncleanableOffset))
this.cleanSegments(log, group, offsetMap, deleteHorizonMs, stats)

// record buffer utilization
stats.bufferUtilization = offsetMap.utilization

stats.allDone()

(endOffset, stats)
}

整个清理过程中我们重点关注一下 offsetMap 的填充过程和分组清理数据的过程,这里的 offsetMap 是一个 Kafka 自定义实现的 SkimpyOffsetMap 类型,其中主要记录了每个消息的 key 和消息在清理区间的最大 offset 值的映射关系,后面需要依据该 offsetMap 来确定需要剔除和保留的消息。填充 offsetMap 的过程位于 Cleaner#buildOffsetMap 方法中,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private[log] def buildOffsetMap(log: Log, // 待清理的 Log 对象
start: Long, // 清理区间起始 offset
end: Long, // 清理区间结束 offset
map: OffsetMap, // 记录消息 key 及其对应的最大 offset
stats: CleanerStats) {
map.clear()
// 获取 [start, end) 之间的 LogSegment 对象,这些对象是本次需要执行清理操作的
val dirty = log.logSegments(start, end).toBuffer
info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end))

var full = false // 标识 map 是否被填充满了
for (segment <- dirty if !full) {
// 检查当前分区的压缩状态,确保不是 LogCleaningAborted 状态
this.checkDone(log.topicPartition)
// 处理当前 LogSegment 中的消息集合,以消息的 key 作为 key,以遍历范围内最大 offset 作为 value,填充 offsetMap
full = this.buildOffsetMapForSegment(log.topicPartition, segment, map, start, log.config.maxMessageSize, stats)
}
info("Offset map for log %s complete.".format(log.name))
}

private def buildOffsetMapForSegment(topicPartition: TopicPartition,
segment: LogSegment,
map: OffsetMap,
start: Long,
maxLogMessageSize: Int,
stats: CleanerStats): Boolean = {
// 获取清理区间起始 offset 对应的消息物理地址
var position = segment.index.lookup(start).position
// 计算当前 map 的最大容量
val maxDesiredMapSize = (map.slots * dupBufferLoadFactor).toInt
// 遍历处理 LogSegment 对象中的消息
while (position < segment.log.sizeInBytes) {
// 再次校验当前分区的状态,确保不是 LogCleaningAborted 状态
this.checkDone(topicPartition)
readBuffer.clear()
// 读取消息集合
segment.log.readInto(readBuffer, position)
val records = MemoryRecords.readableRecords(readBuffer)
throttler.maybeThrottle(records.sizeInBytes)

val startPosition = position
// 深层迭代遍历消息集合
for (entry <- records.deepEntries.asScala) {
val message = entry.record
// 仅处理具备 key,且 offset 位于 start 之后的消息
if (message.hasKey && entry.offset >= start) {
// 如果 map 未满,将消息的 key 及其 offset 放入 map 中,这里会覆盖 offset 较小的 key
if (map.size < maxDesiredMapSize) map.put(message.key, entry.offset)
else return true // 标识 map 已满
}
stats.indexMessagesRead(1)
}
val bytesRead = records.validBytes
// 向前移动地址
position += bytesRead
stats.indexBytesRead(bytesRead)
// 如果 position 未向前移动,则说明未读取到一个完整的消息,需要对 buffer 进行扩容
if (position == startPosition) this.growBuffers(maxLogMessageSize)
} // ~ end while
// 重置 buffer
this.restoreBuffers()
false
}

填充的过程比较直观,上述方法会遍历清理区间的消息集合直到 offsetMap 被填满或到达区间边界为止,并在遍历过程中将持有 key 的消息及其 offset 添加到 offsetMap 中,因为消息是顺序追加的,所以能够保证 offsetMap 中记录的是当前已处理消息的对应的最大 key->offset 映射。

完成了 offsetMap 的填充,接下来方法会依据单个 LogSegment 对象和索引文件的大小上限对需要清理的 LogSegment 对象进行分组,以防止清理操作完成后生成的目标 LogSegment 对象过大或过小,保证尽量均衡。然后方法会遍历每个分组,对分组中的待清理 LogSegment 对象集合调用 Cleaner#cleanSegments 方法执行清理操作并生成最终的 LogSegment 对象替换清理操作前的 LogSegment 对象集合。方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private[log] def cleanSegments(log: Log,
segments: Seq[LogSegment],
map: OffsetMap,
deleteHorizonMs: Long,
stats: CleanerStats) {
// 创建组内第一个 LogSegment 对象的 log 文件对应的“.cleaned”文件
val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix)
logFile.delete()
// 创建 index 文件对应的“.cleaned”文件
val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix)
// 创建 timeindex 文件对应的“.cleaned”文件
val timeIndexFile = new File(segments.head.timeIndex.file.getPath + Log.CleanedFileSuffix)
indexFile.delete()
timeIndexFile.delete()
val records = FileRecords.open(logFile, false, log.initFileSize(), log.config.preallocate)
val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
val timeIndex = new TimeIndex(timeIndexFile, segments.head.baseOffset, segments.head.timeIndex.maxIndexSize)
// 创建清理后数据对应的 LogSegment 对象
val cleaned = new LogSegment(records, index, timeIndex,
segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)

try {
// 遍历处理需要清理的 LogSegment 对象,将清理后的数据记录到 cleaned 文件中
for (old <- segments) {
val retainDeletes = old.lastModified > deleteHorizonMs
info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
.format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if (retainDeletes) "retaining" else "discarding"))
this.cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats)
}

// 对 index 文件进行截断,剔除无效的字节
index.trimToValidSize()
// 对 timeindex 文件进行截断,剔除无效的字节
cleaned.onBecomeInactiveSegment()
timeIndex.trimToValidSize()

// 将 LogSegment 对象相关的文件刷盘
cleaned.flush()

// update the modification date to retain the last modified date of the original files
val modified = segments.last.lastModified
cleaned.lastModified = modified

// 使用清理后的 LogSegment 对象替换清理之前的 LogSegment 对象集合
info("Swapping in cleaned segment %d for segment(s) %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name))
log.replaceSegments(cleaned, segments)
} catch {
case e: LogCleaningAbortedException =>
cleaned.delete()
throw e
}
}

整个清理操作的执行流程可以概括如下:

  1. 创建存储清理后数据的 LogSegment 对象,及其对应的 log、index 和 timeindex 文件,注意此时的文件都以 “.cleaned” 作为后缀;
  2. 遍历处理待清理 LogSegment 对象集合,对每个需要清理的 LogSegment 对象调用 Cleaner#cleanInto 方法执行清理操作,并将清理后的数据写入步骤 1 中创建的 LogSegment 对象中;
  3. 对 index 和 timeindex 文件进行截断,剔除无效字节;
  4. 将存储清理后数据的 LogSegment 对象相关文件进行刷盘;
  5. 使用存储清理后数据的 LogSegment 对象替换 SkipList 中对应的被清理之前的 LogSegment 对象集合。

其中步骤 1 中创建的相关文件均以“.cleaned”作为文件名后缀,并在步骤 4 中将内存中的日志和索引数据落盘到对应文件中,而步骤 5 中除了会使用存储清理后数据的 LogSegment 对象替换 SkipList 中对应的被清理之前的 LogSegment 对象集合之外,还会将相关文件的后缀名由“.cleaned”改为“.swap”,并在完成剔除存储被清理之前数据的 LogSegment 对象集合后,移除文件的“.swap”后缀。前面我们在分析 Log#loadSegments 方法时曾说,如果当前 topic 分区目录下的 log 文件是以“.swap”作为后缀的,那么其中的数据是完整的,只是 broker 节点在执行交换(即移除“.swap”后缀)的过程中宕机了,再次加载时可以直接移除“.swap”后缀并加载,无需担心数据错乱或丢失,分析到这里应该对 broker 节点启动时加载数据文件的过程有更加深入的理解。

下面我们主要来看一下 Cleaner#cleanInto 方法的实现,分析清理操作的具体执行过程,方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private[log] def cleanInto(topicPartition: TopicPartition, // 当前操作的 Log 对应的 topic 分区对象
source: LogSegment, // 需要被清理的 LogSegment
dest: LogSegment, // 清理后得到 LogSegment
map: OffsetMap, // offsetMap
retainDeletes: Boolean, // source.lastModified > deleteHorizonMs,当删除对应的 LogSegment 时,删除标记是否应该被保留
maxLogMessageSize: Int,
stats: CleanerStats) {

// 定义消息过滤器
val logCleanerFilter = new LogEntryFilter {
def shouldRetain(logEntry: LogEntry): Boolean = shouldRetainMessage(source, map, retainDeletes, logEntry, stats)
}

var position = 0
// 遍历处理待清理的 LogSegment 对象中的消息
while (position < source.log.sizeInBytes) {
// 校验对应 topic 分区的清理状态不为 LogCleaningAborted
this.checkDone(topicPartition)
// read a chunk of messages and copy any that are to be retained to the write buffer to be written out
readBuffer.clear()
writeBuffer.clear()

// 读取消息到 buffer
source.log.readInto(readBuffer, position)
val records = MemoryRecords.readableRecords(readBuffer)
throttler.maybeThrottle(records.sizeInBytes)
// 对消息进行过滤,对需要保留的消息写入到 buffer 中
val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize)
stats.readMessages(result.messagesRead, result.bytesRead)
stats.recopyMessages(result.messagesRetained, result.bytesRetained)

position += result.bytesRead

// 对于需要保留的消息,将其追加到清理后的 LogSegment 对象中
val outputBuffer = result.output
if (outputBuffer.position > 0) {
outputBuffer.flip()
val retained = MemoryRecords.readableRecords(outputBuffer)
dest.append(
firstOffset = retained.deepEntries.iterator.next().offset,
largestOffset = result.maxOffset,
largestTimestamp = result.maxTimestamp,
shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp,
records = retained)
throttler.maybeThrottle(outputBuffer.limit)
}

// 如果未能读取一条完整的消息,则需要对 buffer 进行扩容
if (readBuffer.limit > 0 && result.messagesRead == 0) growBuffers(maxLogMessageSize)
}
// 对 buffer 进行重置
this.restoreBuffers()
}

上述方法会深层遍历待清理 LogSegment 对象中的每一条消息,并调用 MemoryRecords#filterTo 对消息执行过滤操作,保留同时满足以下条件的消息:

  1. 消息必须具备 key,且 key 包含在 offsetMap 中;
  2. 消息的 offset 要大于等于 offsetMap 中记录的对应的 offset 值;
  3. 如果对应的消息是删除标记,只有在允许保留该标记是才会保留。

上述条件对应方法 Cleaner#shouldRetainMessage 实现,这里不再展开。在完成对一个消息集合的筛选操作之后,如果所有的消息均需要被保留,则只需要将消息集合写入到目标 buffer 中即可。否则,如果只有部分消息需要被保留,则需要对这部分保留的消息重新压缩(如果需要的话),然后写入目标 buffer 中。

总结

本文我们按照日志数据的组织结构由下往上分析了 LogSegment、Log 和 LogManager 组件,了解了 Kafka 的日志存储机制,其中 Log 用于存储和管理一个 topic 分区下的所有有效的消息数据,并将消息及其索引数据分片采用 LogSegment 对象进行管理。LogManager 实现了 4 个周期性任务分别用于对日志和索引数据执行定期清理、删除、刷盘,以及记录 HW 等操作,同时还维护了一个清理线程对具备相同 key 的重复消息数据进行清理,以减少对磁盘空间的无用消耗。LogManager 并没有提供对日志数据的读写操作,而是委托给相应 topic 分区的 Log 对象执行。