日志数据(亦称消息数据)的存储机制在 Kafka 整个设计与实现中既基础又核心。Kafka 采用本地文件系统对日志数据进行存储,并允许为一个 broker 节点设置多个 log 文件目录,每个 log 目录下存储的数据又按照 topic 分区进行划分,其中包含了一个 topic 分区名下消息数据对应的多组日志和索引文件。
Kafka 定义了 LogSegment 类和 Log 类对日志和索引数据进行管理,并定义了 LogManager 类管理一个 broker 节点下的所有 Log 对象,同时基于 Log 对象提供了对日志数据的加载、创建、删除,以及查询等功能,同时还维护了多个定时任务对日志数据执行清理、删除、刷盘,以及记录 HW 位置等操作,并提供了对 key 重复的消息数据执行压缩的机制。
上图展示了 topic、partition、replica、Log 和 LogSegment 之间的组织关系。在具体实现时组织如下:
一个 broker 节点允许指定多个 log 目录,每个目录下包含多个以“topic-partition”命名的目录,即一个 log 目录下存储了多个 topic 分区对应的消息数据,并且一个 topic 分区只允许属于一个 log 目录。
每个 topic 分区目录下包含多组日志(log)和索引(index、timeindex)文件,Kafka 定义了 LogSegment 类用于封装一组日志和索引文件。
每个 topic 分区对应一个 Log 类对象(一个 broker 节点上只允许存放分区的一个副本,所以从 broker 视角来看一个分区对应一个 Log 类对象),其中包含了一系列隶属对应 topic 分区的 LogSegment 对象,Log 类采用跳跃表(SkipList)数据结构对这些 LogSegment 对象进行管理。
上图进一步展示了 Log 与 LogSegment 之间的组织关系,以及 LogSegment 在 Log 中基于 SkipList 的组织形式(其中青色小圆圈表示单个 LogSegment 对象)。
LogSegment 组件
每个 topic 分区目录下通常会包含多个 log 文件,这些 log 文件 以其中保存的消息的起始 offset 命名 。每个 log 文件由一个 LogSegment 对象进行管理,其中还包含了对应的 index 和 timeindex 文件。下面是关于某个 topic 分区目录下的文件列表(生产环境中一个 topic 分区目录下一般存在多组类似下面这样的文件):
1 2 $ ls topic-default-0/ 00000000000000000122.index 00000000000000000122.log 00000000000000000122.timeindex
LogSegment 类的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class LogSegment (val log: FileRecords , // log 文件对象 val index: OffsetIndex , // index 文件对象 val timeIndex: TimeIndex , // timeindex 文件对象 val baseOffset: Long , // 当前日志分片文件中第一条消息的 offset 值 val indexIntervalBytes: Int , // 索引项之间间隔的最小字节数,对应 index.interval.bytes 配置 val rollJitterMs: Long , time: Time ) extends Logging { private var created = time.milliseconds private var bytesSinceLastIndexEntry = 0 private var rollingBasedTimestamp: Option [Long ] = None @volatile private var maxTimestampSoFar = timeIndex.lastEntry.timestamp @volatile private var offsetOfMaxTimestamp = timeIndex.lastEntry.offset }
其中 FileRecords 类用于封装和管理对应的 log 文件,OffsetIndex 类用于封装和管理对应的 index 文件,TimeIndex 类用于封装和管理对应的 timeindex 文件。这是支撑 Kafka 日志数据存储的 3 个基础类,要理解 Kafka 的日志存储机制,我们需要先理解这 3 个类的定义。
FileRecords 类用于描述和管理日志(分片)文件数据,对应一个 log 文件,其字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class FileRecords extends AbstractRecords implements Closeable { private final boolean isSlice; private final int start; private final int end; private final Iterable<FileChannelLogEntry> shallowEntries; private final AtomicInteger size; private final FileChannel channel; private volatile File file; }
FileRecords 主要定义了对日志数据的追加、读取、删除、查找、截断,以及刷盘等操作,并依赖于 LogEntry 类对单条日志数据的 offset 和 value 进行封装,同时提供了对 log 文件中日志数据的 浅层遍历 和 深层遍历 操作。日志数据在追加到 log 文件中之前可能会执行压缩操作,所谓浅层遍历是指在遍历 log 文件中的日志数据时将压缩后的数据看做是一个整体,而深层遍历则会尝试对这部分日志数据执行解压缩,并返回解压缩后的单条消息。
下面的示例中展示了一个具体的日志文件数据的部分内容(即前面提及的 00000000000000000122.log
文件):
1 2 3 4 5 6 7 8 9 10 11 12 13 LogEntry(122, Record(magic = 1, attributes = 0, compression = NONE, crc = 300223964, CreateTime = 1553937143494, key = 4 bytes, value = 16 bytes)) LogEntry(123, Record(magic = 1, attributes = 0, compression = NONE, crc = 1516889930, CreateTime = 1553937143505, key = 4 bytes, value = 16 bytes)) LogEntry(124, Record(magic = 1, attributes = 0, compression = NONE, crc = 1201423931, CreateTime = 1553937143507, key = 4 bytes, value = 16 bytes)) LogEntry(125, Record(magic = 1, attributes = 0, compression = NONE, crc = 1592544380, CreateTime = 1553937143507, key = 4 bytes, value = 16 bytes)) LogEntry(126, Record(magic = 1, attributes = 0, compression = NONE, crc = 599198486, CreateTime = 1553937143508, key = 4 bytes, value = 16 bytes)) LogEntry(127, Record(magic = 1, attributes = 0, compression = NONE, crc = 980691361, CreateTime = 1553937143509, key = 4 bytes, value = 16 bytes)) LogEntry(128, Record(magic = 1, attributes = 0, compression = NONE, crc = 4047753804, CreateTime = 1553937143511, key = 4 bytes, value = 16 bytes)) LogEntry(129, Record(magic = 1, attributes = 0, compression = NONE, crc = 4289660679, CreateTime = 1553937143511, key = 4 bytes, value = 16 bytes)) LogEntry(130, Record(magic = 1, attributes = 0, compression = NONE, crc = 4016824904, CreateTime = 1553937143512, key = 4 bytes, value = 16 bytes)) LogEntry(131, Record(magic = 1, attributes = 0, compression = NONE, crc = 3305927143, CreateTime = 1553937143512, key = 4 bytes, value = 16 bytes)) LogEntry(132, Record(magic = 1, attributes = 0, compression = NONE, crc = 3847705666, CreateTime = 1553937143513, key = 4 bytes, value = 16 bytes)) ...
上面的示例中我们基于深层遍历调用 LogEntry#toString
方法打印了单条消息的概要信息。
OffsetIndex 类用于描述和管理索引文件数据,定义了对 index 文件的检索、追加,以及截断等功能。一个 OffsetIndex 对象对应一个 index 文件,用于提高消息检索的性能。下面的示例中展示了一个具体的 index 文件数据的部分内容(即前面提及的 00000000000000000122.index
文件):
1 2 3 4 5 6 7 8 9 10 11 12 165, 8910 252, 13608 355, 19170 658, 35532 961, 51894 1191, 64314 1494, 80676 1797, 97038 2100, 113400 2403, 129762 ...
OffsetIndex 的索引项由 8 个字节构成,其中前面 4 个字节表示消息的相对 offset,后面 4 个字节表示消息所在文件的物理地址(position),其中相对 offset 参考的偏移量是对应文件的起始 offset,这样的设计将原本 long 类型(8 字节)的消息 offset 转换成 int 类型(4 字节)的相对 offset 进行存储,能够减少空间占用。此外,Kafka 在构造 index 文件(包括下面要介绍的 timeindex 文件)时并不会针对每个 offset 都建立对应的索引项,而是采用隔一段区间打一个点的稀疏索引机制,以进一步减少对磁盘空间的消耗。
TimeIndex 类同样用于描述和管理索引文件数据,提供了基于时间戳检索日志数据的功能,对应 timeindex 文件。区别于 OffsetIndex 的地方在于 TimeIndex 的索引项由 12 个字节构成,其中前面 8 个字节表示当前 offset 之前已追加消息的最大时间戳(毫秒),后面 4 个字节表示相对 offset,等价于 OffsetIndex 索引项的前 4 个字节。下面的示例中展示了一个具体的 timeindex 文件数据的部分内容(即前面提及的 00000000000000000122.timeindex
文件):
1 2 3 4 5 6 7 8 9 10 11 12 1553937143565, 251 1553937143570, 284 1553937143594, 649 1553937143609, 944 1553937143631, 1166 1553937143652, 1483 1553937143669, 1770 1553937143691, 2096 1553937143707, 2378 1553937143714, 2676 ...
LogSegment 可以看做是对一组日志和索引文件数据的封装,并提供了对这些数据执行追加、读取、截断、删除、刷盘,以及重建等功能。本小节接下来的内容,我们重点分析一下 LogSegment 中主要的日志和索引文件数据操作方法,包括:LogSegment#append
、LogSegment#read
和 LogSegment#recover
方法,其它方法在实现上都比较简单,读者要是感兴趣的话可以自己阅读源码。
追加日志数据
本小节来看一下 LogSegment#append
方法的实现,该方法用于往当前 LogSegment 对应的 log 文件中追加消息数据,并在需要时更新对应的 index 和 timeindex 索引数据。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 def append (firstOffset: Long , largestOffset: Long , largestTimestamp: Long , shallowOffsetOfMaxTimestamp: Long , records: MemoryRecords ) { if (records.sizeInBytes > 0 ) { trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d" .format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp)) val physicalPosition = log.sizeInBytes() if (physicalPosition == 0 ) rollingBasedTimestamp = Some (largestTimestamp) require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset." ) val appendedBytes = log.append(records) trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset " ) if (largestTimestamp > maxTimestampSoFar) { maxTimestampSoFar = largestTimestamp offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp } if (bytesSinceLastIndexEntry > indexIntervalBytes) { index.append(firstOffset, physicalPosition) timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp) bytesSinceLastIndexEntry = 0 } bytesSinceLastIndexEntry += records.sizeInBytes } }
如果当前追加的消息数据是有效的,则 LogSegment 会调用 FileRecords#append
方法将消息数据追加到对应的 log 文件中,并更新本地记录的已追加消息的最大时间戳及其 offset。前面我们介绍了 Kafka 并不会对每条消息都建立索引,而是采用稀疏索引的策略间隔指定大小的字节数(对应 index.interval.bytes
配置)建立索引项,如果当前累计追加的消息字节数超过该配置值,则 Kafka 会更新对应的 index 和 timeindex 数据。
读取日志数据
下面来看一下 LogSegment#read
方法,该方法用于从 LogSegment 对应的 log 文件中读取指定区间的消息数据,读取的消息内容由 startOffset、maxOffset、maxSize 和 maxPosition 这 4 个参数确定。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 def read (startOffset: Long , maxOffset: Option [Long ], maxSize: Int , maxPosition: Long = size, minOneMessage: Boolean = false ): FetchDataInfo = { if (maxSize < 0 ) throw new IllegalArgumentException ("Invalid max size for log read (%d)" .format(maxSize)) val logSize = log.sizeInBytes val startOffsetAndSize = this .translateOffset(startOffset) if (startOffsetAndSize == null ) return null val startPosition = startOffsetAndSize.position val offsetMetadata = new LogOffsetMetadata (startOffset, baseOffset, startPosition) val adjustedMaxSize = if (minOneMessage) math.max(maxSize, startOffsetAndSize.size) else maxSize if (adjustedMaxSize == 0 ) return FetchDataInfo (offsetMetadata, MemoryRecords .EMPTY ) val length = maxOffset match { case None => min((maxPosition - startPosition).toInt, adjustedMaxSize) case Some (offset) => if (offset < startOffset) return FetchDataInfo (offsetMetadata, MemoryRecords .EMPTY ) val mapping = this .translateOffset(offset, startPosition) val endPosition = if (mapping == null ) logSize else mapping.position min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt } FetchDataInfo ( offsetMetadata, log.read(startPosition, length), firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size) }
上述方法的主要逻辑在于确定读取消息的起始位置和读取长度,并最终需要调用 FileRecords#read
方法读取消息数据,该方法接收 2 个参数:position 和 size。参数 position 指代读取消息的起始物理地址,而 size 指代读取消息的字节数,而上述方法的主要逻辑就在于基于参数给定的 4 个坐标来确定 position 和 size 值。
参数 startOffset 设置了当前要读取的消息的起始相对 offset,而 position 是物理地址,所以需要调用 LogSegment#translateOffset
方法进行转换,该方法基于 二分查找算法 从 index 文件中获取小于等于 startOffset 的最大 offset 对应的物理地址。实现如下:
1 2 3 4 5 6 private [log] def translateOffset (offset: Long , startingFilePosition: Int = 0 ): LogEntryPosition = { val mapping = index.lookup(offset) log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition)) }
确定好读取的起始物理地址之后,接下来就需要计算读取的消息字节数 size 值,另外 3 个参数(maxOffset、maxSize 和 maxPosition)用来约束生成 size 值,策略如下:
如果未指定 maxOffset,则 size 等于 max((maxPosition - startPosition), maxSize)
;
如果指定了 maxOffset,需要保证 maxOffset 大于等于 startOffset,然后获取 maxOffset 对应的物理地址,并将该物理地址与 maxPosition 进行比较,选择较小的一个与 startPosition 计算得到对应的 size 值,并保证该 size 值不超过 maxSize。
如果能够基于参数计算得到正确的 position 和 size 值,则方法会依据这两个值调用 FileRecords#read
方法读取对应的消息数据,并封装成 FetchDataInfo 对象返回。
重建索引数据
最后来看一下 LogSegment#recover
方法,该方法用于对 log 文件重建相应的 index 和 timeindex 文件,并校验 log 中数据的有效性。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 def recover (maxMessageSize: Int ): Int = { index.truncate() index.resize(index.maxIndexSize) timeIndex.truncate() timeIndex.resize(timeIndex.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 maxTimestampSoFar = Record .NO_TIMESTAMP try { for (entry <- log.shallowEntries(maxMessageSize).asScala) { val record = entry.record record.ensureValid() if (record.timestamp > maxTimestampSoFar) { maxTimestampSoFar = record.timestamp offsetOfMaxTimestamp = entry.offset } if (validBytes - lastIndexEntry > indexIntervalBytes) { val startOffset = entry.firstOffset index.append(startOffset, validBytes) timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp) lastIndexEntry = validBytes } validBytes += entry.sizeInBytes() } } catch { case e: CorruptRecordException => logger.warn("Found invalid messages in log segment %s at byte offset %d: %s." .format(log.file.getAbsolutePath, validBytes, e.getMessage)) } val truncated = log.sizeInBytes - validBytes log.truncateTo(validBytes) index.trimToValidSize() timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true ) timeIndex.trimToValidSize() truncated }
重建的过程实际上就是遍历 log 文件,并依据设置的索引项最小间隔字节数(对应 index.interval.bytes
配置)区间建立稀疏索引,期间会基于 Record#ensureValid
方法采用 CRC 校验消息数据的有效性,如果存在无效的数据,则退出循环并移除之后的日志和索引。
Log 组件
在一个 log 目录下存在多个以“topic-partition”命名的分区目录,每个 topic 分区对应一个 Log 对象(更准确来说是一个分区副本对应一个 Log 对象),用于管理名下的 LogSegment 对象集合, Log 类使用 SkipList 数据结构对 LogSegment 进行组织和管理 。在 SkipList 中以 LogSegment 的 baseOffset 为 key,以 LogSegment 对象自身作为 value。当读取消息数据时,我们可以基于 offset 快速定位到对应的 LogSegment 对象,然后调用 LogSegment#read
方法读取消息数据。当写入消息时,Kafka 并不允许向 SkipList 中的任意一个 LogSegment 对象追加数据,而只允许往 SkipList 中的最后一个 LogSegment 追加数据,Log 类提供了 Log#activeSegment
用于获取该 LogSegment 对象,称之为 activeSegment。
Log 类的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 class Log (@volatile var dir: File , // 当前 Log 对象对应的 topic 分区目录 @volatile var config: LogConfig , // 配置信息 @volatile var recoveryPoint: Long = 0L, // 恢复操作的起始 offset,即 HW 位置,之前的消息已经全部落盘 scheduler: Scheduler , // 定时任务调度器 time: Time = Time .SYSTEM ) extends Logging with KafkaMetricsGroup { private val lastflushedTime = new AtomicLong (time.milliseconds) @volatile private var nextOffsetMetadata: LogOffsetMetadata = _ private val segments: ConcurrentNavigableMap [java.lang.Long , LogSegment ] = new ConcurrentSkipListMap [java.lang.Long , LogSegment ] val topicPartition: TopicPartition = Log .parseTopicPartitionName(dir) private val tags = Map ("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString) def name : String = dir.getName }
初始化加载日志数据
Log 类在实例化时会调用 Log#loadSegments
方法加载对应 topic 分区目录下的 log、index 和 timeindex 文件。该方法主要做了以下 4 件事情:
删除标记为 deleted 或 cleaned 的文件,将标记为 swap 的文件加入到交换集合中,等待后续继续完成交换过程;
加载 topic 分区目录下全部的 log 文件和 index 文件,如果对应的 index 不存在或数据不完整,则重建;
遍历处理 1 中记录的 swap 文件,使用压缩后的 LogSegment 替换压缩前的 LogSegment 集合,并删除压缩前的日志和索引文件;
后处理,如果对应 SkipList 为空则新建一个空的 activeSegment,如果不为空则校验 recoveryPoint 之后数据的完整性。
方法 Log#loadSegments
的实现比较冗长,下面我们分步骤逐一分析各个过程,首先来看 步骤 1 ,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 for (file <- dir.listFiles if file.isFile) { if (!file.canRead) throw new IOException ("Could not read file " + file) val filename = file.getName if (filename.endsWith(DeletedFileSuffix ) || filename.endsWith(CleanedFileSuffix )) { file.delete() } else if (filename.endsWith(SwapFileSuffix )) { val baseName = new File (CoreUtils .replaceSuffix(file.getPath, SwapFileSuffix , "" )) if (baseName.getPath.endsWith(IndexFileSuffix )) { file.delete() } else if (baseName.getPath.endsWith(LogFileSuffix )) { val index = new File (CoreUtils .replaceSuffix(baseName.getPath, LogFileSuffix , IndexFileSuffix )) index.delete() swapFiles += file } } }
这一步会遍历当前 topic 分区目录下的文件,并处理标记为 deleted、cleaned 和 swap 的文件(以这些名称作为文件后缀名)。这 3 类文件对应的含义为:
deleted 文件 :标识需要被删除的 log 文件和 index 文件。
cleaned 文件 :在执行日志压缩过程中宕机,文件中的数据状态不明确,无法正确恢复的文件。
swap 文件 :完成执行日志压缩后的文件,但是在替换原文件时宕机。
针对 deleted 和 cleaned 文件直接删除即可,对于 swap 文件来说,因为其中的数据是完整的,所以可以继续使用,只需再次完成 swap 操作即可。Kafka 针对 swap 文件的处理策略为:
如果 swap 文件是 log 文件,则删除对应的 index 文件,稍后的 swap 操作会重建索引。
如果 swap 文件是 index 文件,则直接删除,后续加载 log 文件时会重建索引。
完成了对于一些异常状态文件的处理,步骤 2 开始真正执行加载 log 和 index 文件的操作,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 for (file <- dir.listFiles if file.isFile) { val filename = file.getName if (filename.endsWith(IndexFileSuffix ) || filename.endsWith(TimeIndexFileSuffix )) { val logFile = if (filename.endsWith(TimeIndexFileSuffix )) new File (file.getAbsolutePath.replace(TimeIndexFileSuffix , LogFileSuffix )) else new File (file.getAbsolutePath.replace(IndexFileSuffix , LogFileSuffix )) if (!logFile.exists) { warn("Found an orphaned index file, %s, with no corresponding log file." .format(file.getAbsolutePath)) file.delete() } } else if (filename.endsWith(LogFileSuffix )) { val start = filename.substring(0 , filename.length - LogFileSuffix .length).toLong val indexFile = Log .indexFilename(dir, start) val timeIndexFile = Log .timeIndexFilename(dir, start) val indexFileExists = indexFile.exists() val timeIndexFileExists = timeIndexFile.exists() val segment = new LogSegment ( dir = dir, startOffset = start, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, time = time, fileAlreadyExists = true ) if (indexFileExists) { try { segment.index.sanityCheck() if (!timeIndexFileExists) segment.timeIndex.resize(0 ) segment.timeIndex.sanityCheck() } catch { case e: java.lang.IllegalArgumentException => warn(s"Found a corrupted index file due to ${e.getMessage} }. deleting ${timeIndexFile.getAbsolutePath} , " + s"${indexFile.getAbsolutePath} and rebuilding index..." ) indexFile.delete() timeIndexFile.delete() segment.recover(config.maxMessageSize) } } else { error("Could not find index file corresponding to log file %s, rebuilding index..." .format(segment.log.file.getAbsolutePath)) segment.recover(config.maxMessageSize) } segments.put(start, segment) } }
如果当前文件是 index 文件,但对应的 log 文件不存在,则直接删除,因为没有继续保留的意义。如果当前是 log 文件,则这一步会创建 log 文件对应的 LogSegment 对象并记录到 SkipList 中。期间会校验 log 文件对应的 index 和 timeindex 文件,如果索引文件不存在或其中的数据不完整,则会调用前面介绍的 LogSegment#recover
方法重建索引。
步骤 1 中将需要继续执行 swap 操作的文件记录到了 swapFiles 集合中, 步骤 3 的逻辑就是继续完成 swap 操作,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 for (swapFile <- swapFiles) { val logFile = new File (CoreUtils .replaceSuffix(swapFile.getPath, SwapFileSuffix , "" )) val fileName = logFile.getName val startOffset = fileName.substring(0 , fileName.length - LogFileSuffix .length).toLong val indexFile = new File (CoreUtils .replaceSuffix(logFile.getPath, LogFileSuffix , IndexFileSuffix ) + SwapFileSuffix ) val index = new OffsetIndex (indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize) val timeIndexFile = new File (CoreUtils .replaceSuffix(logFile.getPath, LogFileSuffix , TimeIndexFileSuffix ) + SwapFileSuffix ) val timeIndex = new TimeIndex (timeIndexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize) val swapSegment = new LogSegment (FileRecords .open(swapFile), index = index, timeIndex = timeIndex, baseOffset = startOffset, indexIntervalBytes = config.indexInterval, rollJitterMs = config.randomSegmentJitter, time = time) info("Found log file %s from interrupted swap operation, repairing." .format(swapFile.getPath)) swapSegment.recover(config.maxMessageSize) val oldSegments = this .logSegments(swapSegment.baseOffset, swapSegment.nextOffset()) this .replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true ) }
在完成对日志数据的压缩操作后,会将压缩的结果先保存为 swap 文件(以“.swap”作为文件后缀),并最终替换压缩前的日志文件,所以 swap 文件中的数据都是完整,只需要移除对应的“.swap”后缀,并构建对应的 LogSegment 对象即可。但是这里不能简单的将对应的 LogSegment 对象记录到 SkipList 中就万事大吉了,因为 SkipList 中还存在着压缩前的原文件对应的 LogSegment 对象集合,所以需要先将这些 LogSegment 对象集合及其对应的 log 文件和索引文件删除,这也是 Log#replaceSegments
方法的主要逻辑。
完成了前 3 步的工作, 步骤 4 会对前面加载的数据进行校验,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 if (logSegments.isEmpty) { segments.put(0 L, new LogSegment (dir = dir, startOffset = 0 , indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, time = time, fileAlreadyExists = false , initFileSize = this .initFileSize(), preallocate = config.preallocate)) } else { if (!dir.getAbsolutePath.endsWith(Log .DeleteDirSuffix )) { this .recoverLog() activeSegment.index.resize(config.maxIndexSize) activeSegment.timeIndex.resize(config.maxIndexSize) } }
如果前面的步骤中并未加载到任何数据,则对应的 SkipList 是空的,为了保证 SkipList 能够正常工作,需要为其添加一个空的 activeSegment 对象。如果 SkipList 不为空则需要依据 log 目录下是否存在“.kafka_cleanshutdown”文件来判定之前 broker 是否是正常关闭的,如果为非正常关闭则需要对 recoveryPoint 之后的数据进行校验,如果数据存在不完整则进行丢弃,相关实现位于 Log#recoverLog
中,比较简单,不再展开。
追加日志数据
Log 类定义了 Log#append
方法,用于往 Log 对象中追加消息数据。需要注意的一点是,Log 对象使用 SkipList 管理多个 LogSegment,我们在执行追加消息时是不能够往 SkipList 中的任意 LogSegment 对象执行追加操作的,Kafka 设计仅允许往 activeSegment 对象中追加消息。方法 Log#append
实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 def append (records: MemoryRecords , assignOffsets: Boolean = true ): LogAppendInfo = { val appendInfo = this .analyzeAndValidateRecords(records) if (appendInfo.shallowCount == 0 ) return appendInfo var validRecords = this .trimInvalidBytes(records, appendInfo) try { lock synchronized { if (assignOffsets) { val offset = new LongRef (nextOffsetMetadata.messageOffset) appendInfo.firstOffset = offset.value val now = time.milliseconds val validateAndOffsetAssignResult = try { LogValidator .validateMessagesAndAssignOffsets( validRecords, offset, now, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact, config.messageFormatVersion.messageFormatVersion, config.messageTimestampType, config.messageTimestampDifferenceMaxMs) } catch { case e: IOException => throw new KafkaException ("Error in validating messages while appending to log '%s'" .format(name), e) } validRecords = validateAndOffsetAssignResult.validatedRecords appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp appendInfo.lastOffset = offset.value - 1 if (config.messageTimestampType == TimestampType .LOG_APPEND_TIME ) appendInfo.logAppendTime = now if (validateAndOffsetAssignResult.messageSizeMaybeChanged) { for (logEntry <- validRecords.shallowEntries.asScala) { if (logEntry.sizeInBytes > config.maxMessageSize) { BrokerTopicStats .getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) BrokerTopicStats .getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) throw new RecordTooLargeException ( "Message size is %d bytes which exceeds the maximum configured message size of %s." .format(logEntry.sizeInBytes, config.maxMessageSize)) } } } } else { if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset) throw new IllegalArgumentException ("Out of order offsets found in " + records.deepEntries.asScala.map(_.offset)) } if (validRecords.sizeInBytes > config.segmentSize) { throw new RecordBatchTooLargeException ( "Message set size is %d bytes which exceeds the maximum configured segment size of %s." .format(validRecords.sizeInBytes, config.segmentSize)) } val segment = this .maybeRoll( messagesSize = validRecords.sizeInBytes, maxTimestampInMessages = appendInfo.maxTimestamp, maxOffsetInMessages = appendInfo.lastOffset) segment.append( firstOffset = appendInfo.firstOffset, largestOffset = appendInfo.lastOffset, largestTimestamp = appendInfo.maxTimestamp, shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp, records = validRecords) this .updateLogEndOffset(appendInfo.lastOffset + 1 ) trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s" .format(this .name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords)) if (unflushedMessages >= config.flushInterval) this .flush() appendInfo } } catch { case e: IOException => throw new KafkaStorageException ("I/O exception in append to log '%s'" .format(name), e) } }
追加消息数据操作的整体执行流程可以概括为:
解析并校验待追加的消息集合,将其封装成 LogAppendInfo 对象;
剔除待追加消息集合中未通过验证的字节部分;
如果指定需要为消息分配 offset,则对消息(包括压缩后的)执行分配 offset 操作,并对消息执行 magic 值统一、数据完整性校验,以及按需更新消息时间戳等操作;
如果指定不需要为消息分配 offset,则需要保证消息已有 offset 是单调递增,且起始 offset 不能小于当前 Log 对象中记录的下一条待追加消息的 offset;
校验处理后消息集合的总长度,保证不超过单个 LogSegment 对象所允许的最大长度;
获取目标 activeSegment 对象,如果需要则创建一个新的 activeSegment 对象并返回;
往目标 activeSegment 对象中追加消息数据,并更新当前 Log 对象中记录的下一条待追加消息的 offset 值;
如果当前时间距离上次执行刷盘操作的时间超过配置的时间间隔,则执行刷盘操作。
下面我们分步骤对整个执行过程进行进一步分析,首先来看 步骤 1 ,实现位于 Log#analyzeAndValidateRecords
方法中,该方法对待追加的消息集合中的消息逐条进行解析和验证,并封装成 LogAppendInfo 对象返回。实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 private def analyzeAndValidateRecords (records: MemoryRecords ): LogAppendInfo = { var shallowMessageCount = 0 var validBytesCount = 0 var firstOffset = -1 L var lastOffset = -1 L var sourceCodec: CompressionCodec = NoCompressionCodec var monotonic = true var maxTimestamp = Record .NO_TIMESTAMP var offsetOfMaxTimestamp = -1 L for (entry <- records.shallowEntries.asScala) { if (firstOffset < 0 ) firstOffset = entry.offset if (lastOffset >= entry.offset) monotonic = false lastOffset = entry.offset val record = entry.record val messageSize = entry.sizeInBytes if (messageSize > config.maxMessageSize) { BrokerTopicStats .getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) BrokerTopicStats .getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) throw new RecordTooLargeException ("Message size is %d bytes which exceeds the maximum configured message size of %s." .format(messageSize, config.maxMessageSize)) } record.ensureValid() if (record.timestamp > maxTimestamp) { maxTimestamp = record.timestamp offsetOfMaxTimestamp = lastOffset } shallowMessageCount += 1 validBytesCount += messageSize val messageCodec = CompressionCodec .getCompressionCodec(record.compressionType.id) if (messageCodec != NoCompressionCodec ) sourceCodec = messageCodec } val targetCodec = BrokerCompressionCodec .getTargetCompressionCodec(config.compressionType, sourceCodec) LogAppendInfo (firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, Record .NO_TIMESTAMP , sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic) }
概括来说,上述方法主要做了以下 3 件事情:
对待追加消息集合中的每条消息执行 CRC 校验;
对待追加消息集合中的每条消息的长度进行校验,保证不超过允许的最大值(对应 max.message.bytes
配置);
计算待追加消息集合中的 firstOffset、lastOffset、消息的条数、有消息的字节数、offset 是否单调递增,以及获取生产者所指定的消息压缩方式。
步骤 2 会依据步骤 1 中对消息的校验结果,对未通过验证的消息字节部分进行截断,实现位于 Log#trimInvalidBytes
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private def trimInvalidBytes (records: MemoryRecords , info: LogAppendInfo ): MemoryRecords = { val validBytes = info.validBytes if (validBytes < 0 ) throw new CorruptRecordException ( "Illegal length of message set " + validBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests" ) if (validBytes == records.sizeInBytes) { records } else { val validByteBuffer = records.buffer.duplicate() validByteBuffer.limit(validBytes) MemoryRecords .readableRecords(validByteBuffer) } }
如果在调用 Log#append
方法时设置了参数 assignOffsets = true
,则在追加消息数据之前会为消息重新分配 offset(对应 步骤 3 ),起始 offset 为当前 Log 对象中记录的下一条待追加消息的 offset 值。这一步主要做了以下几件事情:
更新待追加消息集合的 firstOffset 为当前 Log 对象中记录的下一条待追加消息对应的 offset 值;
对消息(包括压缩后的)的 magic 值进行统一,验证数据完整性,并分配 offset,同时按要求更新消息的时间戳;
更新待追加消息集合的 lastOffset 值;
如果配置了 message.timestamp.type=LogAppendTime
,则设置日志追加时间戳;
对待追加消息集合中的消息进行逐条校验,避免存在过长的消息。
我们重点看一下第 2 步,这一步会执行 offset 分配操作,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private [kafka] def validateMessagesAndAssignOffsets (records: MemoryRecords , offsetCounter: LongRef , now: Long , sourceCodec: CompressionCodec , targetCodec: CompressionCodec , compactedTopic: Boolean = false , messageFormatVersion: Byte = Record .CURRENT_MAGIC_VALUE , messageTimestampType: TimestampType , messageTimestampDiffMaxMs: Long ): ValidationAndOffsetAssignResult = { if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec ) { if (!records.hasMatchingShallowMagic(messageFormatVersion)) { convertAndAssignOffsetsNonCompressed( records, offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs, messageFormatVersion) } else { assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, messageTimestampType, messageTimestampDiffMaxMs) } } else { validateMessagesAndAssignOffsetsCompressed( records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic, messageFormatVersion, messageTimestampType, messageTimestampDiffMaxMs) } }
由上面的实现可以看到,不管消息是否经过压缩,如果指定了需要为消息分配 offset,则需要处理所有的消息,包括经过压缩过的消息。方法 LogValidator#validateMessagesAndAssignOffsets
的主要工作也就是依据消息是否被压缩来分别调用对应的方法对待追加消息统一 magic 值,并执行 offset 分配、数据完整性校验,以及按需更新消息时间戳等操作,如果消息是经过压缩的,那么会对其进行解压缩。相关的方法实现比较冗长,这里不再继续深入。
如果指定不需要重新分配 offset 值,那么处理过程将会简单很多,仅仅需要验证消息已有的 offset 是否是单调递增的,并且待追加消息集合中消息的 firstOffset 不能小于 Log 对象中记录的下一条待追加消息的 offset 值,否则说明待追加的消息集合是非法的,这也是 步骤 4 的主要工作。
步骤 5 会校验处理后消息集合的长度,保证不超过单个 LogSegment 对象所允许的最大长度(对应 segment.bytes
配置)。
在完成了一系列准备工作之后,接下去可以将处理后的待追加消息数据写入 activeSegment 对象中。 步骤 6 调用了 Log#maybeRoll
方法尝试从 SkipList 中获取目标 activeSegment 对象,并在需要时创建新的 activeSegment 对象。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private def maybeRoll (messagesSize: Int , maxTimestampInMessages: Long , maxOffsetInMessages: Long ): LogSegment = { val segment = activeSegment val now = time.milliseconds val reachedRollMs = segment.timeWaitedForRoll(now, maxTimestampInMessages) > config.segmentMs - segment.rollJitterMs if (segment.size > config.segmentSize - messagesSize || (segment.size > 0 && reachedRollMs) || segment.index.isFull || segment.timeIndex.isFull || !segment.canConvertToRelativeOffset(maxOffsetInMessages)) { this .roll(maxOffsetInMessages - Integer .MAX_VALUE ) } else { segment } }
如果满足以下条件之一,则会创建一个新的 activeSegment 对象:
当前 activeSegment 对象在追加本次消息之后,长度超过 LogSegment 允许的最大值(对应 segment.bytes
配置)。
当前 activeSegment 对象的存活时间超过了允许的最大时间(对应 segment.ms
配置)。
对应的索引文件(index 和 timeindex)满了。
创建新 activeSegment 对象的过程位于 Log#roll
方法中,这里先不展开,后面会专门进行分析。
既然已经拿到了目标 activeSegment 对象,那么下一步( 步骤 7 )就是将待追加的消息数据写入 activeSegment 对象中(调用 LogSegment#append
方法,前面已经分析过)。写入成功之后需要更新 Log 对象本地记录的下一条待追加消息对应的 offset 值。
最后( 步骤 8 ),方法会检测当前时间距离上一次执行刷盘的时间是否超过配置的时间间隔(对应 flush.messages
配置),是则执行刷盘操作。相关实现位于 Log#flush
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def flush (): Unit = this .flush(this .logEndOffset)def flush (offset: Long ): Unit = { if (offset <= recoveryPoint) return debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " + time.milliseconds + " unflushed = " + unflushedMessages) for (segment <- this .logSegments(recoveryPoint, offset)) segment.flush() lock synchronized { if (offset > recoveryPoint) { this .recoveryPoint = offset lastflushedTime.set(time.milliseconds) } } }
执行刷盘操作之前会先将当前 offset 与 recoveryPoint 变量进行比较,这里的 offset 对应当前 Log 对象中记录的下一条待追加消息的 offset,而 recoveryPoint 变量在当前 Log 对象创建时指定,并在运行过程中更新,用于表示当前已经刷盘的日志数据对应的最大 offset 值。如果当前 offset 小于等于 recoveryPoint,则无需执行刷盘操作,因为 recoveryPoint 之前的数据已经全部落盘了。否则会调用 Log#logSegments
方法从当前 Log 对象的 SkipList 中获取位于 [recoveryPoint, offset)
区间的 LogSegment 对象集合,并应用 LogSegment#flush
方法对 LogSegment 相关的文件执行刷盘操作,包括 log、index 和 timeindex 文件。同时会更新 recoveryPoint 和 lastflushedTime 字段,后者用于记录最近一次执行刷盘操作的时间戳。
创建 Active Segment 对象
既然上一小节提到了 Log#roll
方法,那么本小节就来分析一下该方法的实现,该方法用于创建一个新的 activeSegment 对象,并将上任的 activeSegment 对象中的数据落盘。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 def roll (expectedNextOffset: Long = 0 ): LogSegment = { val start = time.nanoseconds lock synchronized { val newOffset = Math .max(expectedNextOffset, logEndOffset) val logFile = Log .logFile(dir, newOffset) val indexFile = indexFilename(dir, newOffset) val timeIndexFile = timeIndexFilename(dir, newOffset) for (file <- List (logFile, indexFile, timeIndexFile); if file.exists) { warn("Newly rolled segment file " + file.getName + " already exists; deleting it first" ) file.delete() } segments.lastEntry() match { case null => case entry => val seg: LogSegment = entry.getValue seg.onBecomeInactiveSegment() seg.index.trimToValidSize() seg.timeIndex.trimToValidSize() seg.log.trim() } val segment = new LogSegment ( dir, startOffset = newOffset, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, time = time, fileAlreadyExists = false , initFileSize = initFileSize(), preallocate = config.preallocate) val prev = this .addSegment(segment) if (prev != null ) throw new KafkaException ("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists." .format(name, newOffset)) this .updateLogEndOffset(nextOffsetMetadata.messageOffset) scheduler.schedule("flush-log" , () => this .flush(newOffset)) info("Rolled new log segment for '" + name + "' in %.0f ms." .format((System .nanoTime - start) / (1000.0 * 1000.0 ))) segment } }
创建一个新的 activeSegment 对象的过程比较直观,无非是创建一个新的 activeSegment 对象,并将其添加到 SkipList 中,同时需要更新 Log 对象本地记录的 activeSegment 对象的 baseOffset 及其物理地址。此外,我们需要将上一任 activeSegment 对象中的数据落盘,Kafka 为此注册了一个名为 flush-log 的定时任务异步处理该过程,需要注意的是这里的 flush-log 任务仅运行一次。这里的刷盘操作是将 recoveryPoint 到新 activeSegment 对象 baseOffset (不包括)之间的数据落盘,具体的落盘操作交由 Log#flush
方法执行,我们在前面已经分析过该方法,这里不再重复撰述。
读取日志数据
下面接着来看一下从 Log 对象中读取日志数据的过程,位于 Log#read
方法中。不同于追加消息时只能操作 activeSegment 对象,读取消息可以从 SkipList 中任意一个 LogSegment 对象中进行读取。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 def read (startOffset: Long , maxLength: Int , maxOffset: Option [Long ] = None , minOneMessage: Boolean = false ): FetchDataInfo = { trace("Reading %d bytes from offset %d in log %s of length %d bytes" .format(maxLength, startOffset, name, size)) val currentNextOffsetMetadata = nextOffsetMetadata val next = currentNextOffsetMetadata.messageOffset if (startOffset == next) return FetchDataInfo (currentNextOffsetMetadata, MemoryRecords .EMPTY ) var entry = segments.floorEntry(startOffset) if (startOffset > next || entry == null ) throw new OffsetOutOfRangeException ("Request for offset %d but we only have log segments in the range %s to %d." .format(startOffset, segments.firstKey, next)) while (entry != null ) { val maxPosition = { if (entry == segments.lastEntry) { val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong if (entry != segments.lastEntry) entry.getValue.size else exposedPos } else { entry.getValue.size } } val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage) if (fetchInfo == null ) { entry = segments.higherEntry(entry.getKey) } else { return fetchInfo } } FetchDataInfo (nextOffsetMetadata, MemoryRecords .EMPTY ) }
读取日志数据的执行过程如代码注释,比较直观,在做好边界检查的前提下寻找小于 startOffset 的最大 baseOffset,并以此 offset 开始从 SkipList 中定位 LogSegment 对象,如果该 LogSegment 对象为空,则会继续读取下一个 LogSegment 对象。读取的过程区分是不是 activeSegment 对象,如果当前读取的 LogSegment 不是 activeSegment 对象,那么对应的 LogSegment 已经是“冷却”状态,所以我们可以直接将其中的数据全部读取出来返回,如果当前读取的是 activeSegment 对象,则需要以 Log 对象中记录的 activeSegment 对象的最大物理地址作为读取的上界,如果直接读取到 activeSegment 对象结尾可能导致 OffsetOutOfRangeException 异常。考虑下面这样一个场景(假设有读线程 A 和写线程 B):
A 线程请求读取 startOffset 为 101 之后的数据,刚好该请求落在了 activeSegment 对象上;
B 线程调用 append 方法追加了 offset 为 [105, 109] 的消息集合,但是还未更新 Log 对象本地记录的下一条消息对应的 offset 值(此时仍为 105);
A 线程读取到了 [101, 109] 之间的数据,并且继续请求 startOffset 为 110 之后的数据,但是因为 startOffset > next
而抛出 OffsetOutOfRangeException 异常。
所以对于 activeSegment 对象而言,我们应该以 Log 对象中记录的 activeSegment 对应的最大物理地址作为上界。另外一个需要考虑的问题是在读取 activeSegment 对象过程中,因为追加消息而产生了新的 activeSegment 对象的情况,那么此时 Log#read
方法持有的 activeSegment 对象就变成前任了,也就不会再有写操作同时发生的问题,所以可以直接读取到该 activeSegment 对象的结尾位置。
删除日志数据
本章节的最后,一起来看一下 Log#delete
方法,该方法会删除当前 Log 对象对应 log 目录,以及目录下的所有文件,并清空 SkipList 对象。方法实现如下:
1 2 3 4 5 6 7 8 9 10 private [log] def delete () { lock synchronized { logSegments.foreach(_.delete()) segments.clear() Utils .delete(dir) } }
具体逻辑如代码注释,比较简单。
LogManager 组件
LogManager 是 Kafka 日志数据操作的入口,基于上一节分析的 Log 类对象提供了对日志数据的加载、创建、删除,以及查询等功能。我们在配置 Kafka 服务时,可以通过 log.dirs
配置项为一个 broker 节点指定多个 log 目录,这些目录均由 LogManager 负责管理。LogManager 在启动时会校验 log.dirs
配置,确保指定的 log 目录没有重复的配置且都是可读的,同时对于不存在的目录会执行创建。每个 log 目录下包含多个 topic 分区目录,每个 topic 分区目录由一个 Log 类对象对其进行管理,LogManager 会记录每个 topic 分区对象及其对应的 Log 类对象之间的映射关系。LogManager 类的字段定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 class LogManager (val logDirs: Array [File ], // log 目录集合,对应 log.dirs 配置,一般选择 log 数目最少的目录进行创建 val topicConfigs: Map [String , LogConfig ], // topic 相关配置 val defaultConfig: LogConfig , val cleanerConfig: CleanerConfig , // log cleaner 相关配置 ioThreads: Int , // 每个 log 目录下分配的执行加载任务的线程数目 val flushCheckMs: Long , val flushCheckpointMs: Long , val retentionCheckMs: Long , scheduler: Scheduler , // 定时任务调度器 val brokerState: BrokerState , // 当前 broker 节点的状态 time: Time ) extends Logging { val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" private val logCreationOrDeletionLock = new Object private val logs = new Pool [TopicPartition , Log ]() private val logsToBeDeleted = new LinkedBlockingQueue [Log ]() private val dirLocks = this .lockLogDirs(logDirs) private val recoveryPointCheckpoints = logDirs.map( dir => (dir, new OffsetCheckpoint (new File (dir, RecoveryPointCheckpointFile )))).toMap val cleaner: LogCleaner = if (cleanerConfig.enableCleaner) new LogCleaner (cleanerConfig, logDirs, logs, time = time) else null }
LogManager 在实例化过程中会执行以下操作:
遍历处理配置的 log 路径(对应 log.dirs
配置),如果对应的路径不存在则创建,同时校验路径是否存在重复、是否是目录,以及是否可读;
遍历配置的 log 目录,尝试对每个目录在文件系统层面加锁,这里加的是进程锁;
遍历配置的 log 目录,为每个目录下的 recovery-point-offset-checkpoint 文件创建对应的 OffsetCheckpoint 对象,用于管理每个 topic 分区对应的 HW offset 信息;
遍历配置的 log 目录,将每个 topic 分区对应的日志数据封装成 Log 对象,同时记录需要被删除的 topic 分区目录,等待后续删除。
文件 recovery-point-offset-checkpoint 用于记录每个 topic 分区对应的 HW offset 信息,当 broker 节点重启时辅助恢复每个 topic 分区的日志数据。一个简单的文件示例如下:
1 2 3 4 5 6 7 8 9 10 0 8 topic-default 3 2271154 topic-default 2 2271351 topic-default 4 2271051 topic-default 0 2270751 topic-default 5 2271558 topic-default 1 2272018 topic-default 7 2271197 topic-default 6 2270673
其中第一行是版本号,第二行是记录条数,从第三行开始每一行都记录着“topic partition HW”信息。OffsetCheckpoint 类定义了 OffsetCheckpoint#write
和 OffsetCheckpoint#read
两个方法,用于对 recovery-point-offset-checkpoint 执行读写操作。
步骤 4 会执行加载每个 log 目录下的日志文件,并为每个 topic 分区对应的日志目录创建一个 Log 对象,对于标记为需要删除的 topic 分区目录(对应“-delete”后缀的目录),则将其 Log 对象添加到 LogManager#logsToBeDeleted
字段中,等待后面的周期性任务(kafka-delete-logs)对其进行删除。相关实现位于 LogManager#loadLogs
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 private def loadLogs (): Unit = { info("Loading logs." ) val startMs = time.milliseconds val threadPools = mutable.ArrayBuffer .empty[ExecutorService ] val jobs = mutable.Map .empty[File , Seq [Future [_]]] for (dir <- this .logDirs) { val pool = Executors .newFixedThreadPool(ioThreads) threadPools.append(pool) val cleanShutdownFile = new File (dir, Log .CleanShutdownFile ) if (cleanShutdownFile.exists) { debug("Found clean shutdown file. Skipping recovery for all logs in data directory: " + dir.getAbsolutePath) } else { brokerState.newState(RecoveringFromUncleanShutdown ) } var recoveryPoints = Map [TopicPartition , Long ]() try { recoveryPoints = this .recoveryPointCheckpoints(dir).read() } catch { case e: Exception => warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e) warn("Resetting the recovery checkpoint to 0" ) } val jobsForDir = for { dirContent <- Option (dir.listFiles).toList logDir <- dirContent if logDir.isDirectory } yield { CoreUtils .runnable { debug("Loading log '" + logDir.getName + "'" ) val topicPartition = Log .parseTopicPartitionName(logDir) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0 L) val current = new Log (logDir, config, logRecoveryPoint, scheduler, time) if (logDir.getName.endsWith(Log .DeleteDirSuffix )) { logsToBeDeleted.add(current) } else { val previous = logs.put(topicPartition, current) if (previous != null ) { throw new IllegalArgumentException ( "Duplicate log directories found: %s, %s!" .format(current.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } } } jobs(cleanShutdownFile) = jobsForDir.map(pool.submit) } try { for ((cleanShutdownFile, dirJobs) <- jobs) { dirJobs.foreach(_.get) cleanShutdownFile.delete() } } catch { case e: ExecutionException => error("There was an error in one of the threads during logs loading: " + e.getCause) throw e.getCause } finally { threadPools.foreach(_.shutdown()) } info(s"Logs loading complete in ${time.milliseconds - startMs} ms." ) }
LogManager 在实例化时会为每个 log 目录创建一个指定大小的线程池,然后对目录下的子目录(不包括文件)进行并发加载,最终将每个 topic 分区目录下的日志相关数据封装成 Log 对象,并记录到 LogManager#logs
字段中,这是一个 Pool[K, V]
类型的字段,基于 ConcurrentHashMap 实现,其中这里的 key 为 Log 对象所属的 topic 分区对象。
在 LogManager 启动时(对应 LogManager#startup
方法)会注册一个名为 kafka-delete-logs 的周期性任务,该任务会周期性调用 LogManager#deleteLogs
方法对标记为“-delete”的目录执行删除操作。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private def deleteLogs (): Unit = { try { var failed = 0 while (!logsToBeDeleted.isEmpty && failed < logsToBeDeleted.size()) { val removedLog = logsToBeDeleted.take() if (removedLog != null ) { try { removedLog.delete() info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath} ." ) } catch { case e: Throwable => error(s"Exception in deleting $removedLog . Moving it to the end of the queue." , e) failed = failed + 1 logsToBeDeleted.put(removedLog) } } } } catch { case e: Throwable => error(s"Exception in kafka-delete-logs thread." , e) } }
方法 LogManager#deleteLogs
会遍历 LogManager#logsToBeDeleted
队列,并对其中的 Log 对象调用 Log#delete
方法执行删除,如果删除异常则会归还到队列,并在下一次周期性调用时再尝试执行删除。方法 Log#delete
已经在前面分析过,这里不再重复撰述。
周期性定时任务
前面分析了启动过程中激活的 kafka-delete-logs 周期性任务,下面继续来看一下 LogManager#startup
方法的剩余实现,该方法主要的逻辑就是启动 4 个周期性任务。在 Kafka 服务启动时会创建 LogManager 实例,并调用 LogManager#startup
方法,该方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 def startup () { if (scheduler != null ) { info("Starting log cleanup with a period of %d ms." .format(retentionCheckMs)) scheduler.schedule("kafka-log-retention" , this .cleanupLogs, delay = InitialTaskDelayMs , period = retentionCheckMs, TimeUnit .MILLISECONDS ) info("Starting log flusher with a default period of %d ms." .format(flushCheckMs)) scheduler.schedule("kafka-log-flusher" , this .flushDirtyLogs, delay = InitialTaskDelayMs , period = flushCheckMs, TimeUnit .MILLISECONDS ) scheduler.schedule("kafka-recovery-point-checkpoint" , this .checkpointRecoveryPointOffsets, delay = InitialTaskDelayMs , period = flushCheckpointMs, TimeUnit .MILLISECONDS ) scheduler.schedule("kafka-delete-logs" , this .deleteLogs, delay = InitialTaskDelayMs , period = defaultConfig.fileDeleteDelayMs, TimeUnit .MILLISECONDS ) } if (cleanerConfig.enableCleaner) cleaner.startup() }
LogManager 在启动过程中启动了 4 个周期性任务和 1 个 LogCleaner 线程,这 4 个周期性任务包括:
kafka-log-retention :定期对过期或过大的日志文件执行清理操作。
kafka-log-flusher :定期对日志文件执行刷盘操作。
kafka-recovery-point-checkpoint :定期更新 recovery-point-offset-checkpoint 文件。
kafka-delete-logs :定期删除标记为需要被删除的 log 目录。
其中任务 4 我们已经在前面分析过,下面逐个来看一下前 3 个任务。 任务 1 的实现位于 LogManager#cleanupLogs
方法中,该方法会遍历所有的 Log 对象,并从两个维度对执行清理工作:
时间维度:即保证 Log 对象中所有的 LogSegment 都是有效的,对于过期的 LogSegment 执行删除操作。
空间维度:既保证 Log 对象不应过大,对于超出的部分会执行删除操作。
实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 def cleanupLogs () { debug("Beginning log cleanup..." ) var total = 0 val startMs = time.milliseconds for (log <- allLogs(); if !log.config.compact) { debug("Garbage collecting '" + log.name + "'" ) total += log.deleteOldSegments() } debug("Log cleanup completed. " + total + " files deleted in " + (time.milliseconds - startMs) / 1000 + " seconds" ) }
清理操作仅处理配置了 cleanup.policy=delete
的 Log 对象,并调用 Log#deleteOldSegments
方法执行判定和删除操作。方法 Log#deleteOldSegments
中通过调用 Log#deleteRetentionMsBreachedSegments
对过期的 LogSegment 对象执行删除操作,并调用 Log#deleteRetentionSizeBreachedSegments
方法对当前 Log 对象的大小进行判定,如果超过设定大小,则会从 Log 对象中删除部分 LogSegment 对象,以保证最终的 Log 大小在允许范围内。这两个方法最终都是调用 Log#deleteOldSegments
方法执行具体的删除操作,该方法接收一个 LogSegment => Boolean
类的函数,如果某个 LogSegment 对象满足给定的谓语,则会应用 Log#deleteSegment
方法对该 LogSegment 执行删除操作。
其中 Log#deleteRetentionMsBreachedSegments
方法给定的判定条件很简单(如下),比较当前 LogSegment 对象最大消息时间戳距离当前时间是否超过 retention.ms
毫秒,如果超过则认为该 LogSegment 已过期。
1 2 3 4 5 6 private def deleteRetentionMsBreachedSegments (): Int = { if (config.retentionMs < 0 ) return 0 val startMs = time.milliseconds this .deleteOldSegments(startMs - _.largestTimestamp > config.retentionMs) }
而 Log#deleteRetentionSizeBreachedSegments
方法则会首先计算出当前 Log 超出设定值(对应 retention.bytes
配置)的字节数,然后对 Log 中的 LogSegment 对象遍历删除,直到 Log 的大小不再超出为止。实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private def deleteRetentionSizeBreachedSegments (): Int = { if (config.retentionSize < 0 || size < config.retentionSize) return 0 var diff = size - config.retentionSize def shouldDelete (segment: LogSegment ): Boolean = { if (diff - segment.size >= 0 ) { diff -= segment.size true } else { false } } this .deleteOldSegments(shouldDelete) }
接下来继续看一下公共逻辑 Log#deleteOldSegments
方法(实现如下),该方法会基于给定的谓语 predicate 从 Log 中选择需要被删除的 LogSegment 对象,并对每个需要被删除的 LogSegment 对象应用 Log#deleteSegment
方法进行删除,包括从 Log 对象中移除该 LogSegment 对象,以及删除 LogSegment 对应的 log、index 和 timeindex 文件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private def deleteOldSegments (predicate: LogSegment => Boolean ): Int = { lock synchronized { val deletable = this .deletableSegments(predicate) val numToDelete = deletable.size if (numToDelete > 0 ) { if (segments.size == numToDelete) this .roll() deletable.foreach(deleteSegment) } numToDelete } }
如果本次删除操作需要删除 Log 中全部的 LogSegment 对象,则会调用 Log#roll
方法为当前 Log 对象的 SkipList 创建一个新的 activeSegment 对象,以保证 Log 的正常运行,该方法的实现在前面已经分析过,不再重复撰述。
再来看一下周期性 任务 2 ,该任务用于定期对日志文件执行刷盘(flush)操作。相关逻辑实现位于 LogManager#flushDirtyLogs
方法中,该方法会遍历处理每个 topic 分区对应的 Log 对象,通过记录在 Log 对象中的上次执行 flush 的时间戳与当前时间对比,如果时间差值超过一定的阈值(对应 flush.ms
配置),则调用 Log#flush
方法执行刷盘操作,该方法的实现同样在前面已经分析过,不再重复撰述。
接着来看一下周期性 任务 3 ,该任务用于定期更新每个 log 目录名下的 recovery-point-offset-checkpoint 文件。相关实现位于 LogManager#checkpointRecoveryPointOffsets
中:
1 2 3 4 5 6 7 8 9 10 11 12 13 def checkpointRecoveryPointOffsets () { logDirs.foreach(checkpointLogsInDir) } private def checkpointLogsInDir (dir: File ): Unit = { val recoveryPoints = logsByDir.get(dir.toString) if (recoveryPoints.isDefined) { this .recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint)) } }
方法会获取位于指定 log 目录下所有 topic 分区对应的 recoveryPoint 值(即当前已经落盘的日志的最大 offset),并全量更新 log 目录下的 recovery-point-offset-checkpoint 文件。
重复日志数据清理
本小节来看一下 LogCleaner 线程,如果在配置中指定了 log.cleaner.enable=true
,那么在 LogManager#startup
方法的最后会调用 LogCleaner#startup
方法启动 LogCleaner 线程对日志数据执行清理工作。前面我们在分析周期性任务 kafka-log-retention 时,已经知道该周期性任务会对日志中过大或过期的 LogSegment 对象执行清理操作,那么 LogCleaner 又是对什么执行清理呢?
我们知道 Kafka 对于生产者发来的消息都是顺序追加到日志文件中的,而 Kafka 又采用本地文件系统对日志文件进行存储,所以随着时间的流逝日志文件会越来越大,其中存储的相当一部分消息数据都具备相同的 key。如果配置了 cleanup.policy=compact
策略,那么 Kafka 的 LogCleaner 线程就会对具备相同 key 的消息进行清理操作,仅保留当前具备最大 offset 的 key 的消息。
LogCleaner 在执行清理操作时会将一个 log 分割成 clean 和 dirty 两部分。其中 clean 是上次完成清理的部分,Kafka 会在对应 log 目录下生成一个 cleaner-offset-checkpoint 文件,用于记录每个 topic 分区上一次执行清理操作的 offset 值,而 dirty 部分则是本次清理操作的目标区域,但是 dirty 中并不是所有的 LogSegment 对象都会执行清理操作,Kafka 又将这一部分分为了 cleanable 和 uncleanable 两块,能够被分为 uncleanable 的 LogSegment 对象包含两类:
当前 Log 对象中的 activeSegment 对象。
LogSegment 对象中的最大消息时间戳距离当前时间位于配置的滞后压缩时间(对应 min.compaction.lag.ms
配置)范围内。
其中不清理 activeSegment 对象,主要是为了防止竞态条件,因为 activeSegment 是可以写入的对象,这样会让清理操作变得复杂,且收益不大。
下面我们从 LogCleaner#startup
方法开始,整个清理工作主要涉及 LogCleaner、LogCleanerManager、CleanerThread,以及 Cleaner 这 4 个类。方法 LogCleaner#startup
的主要作用就是启动注册在 LogCleaner 中的 CleanerThread 线程集合。CleanerThread 继承自 ShutdownableThread 抽象类,所以 CleanerThread#doWork
方法是其处理入口,该方法只是简单调用了 CleanerThread#cleanOrSleep
方法,后者会选取一个最需要被清理的 LogSegment 区间,并执行清理工作。相关实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private def cleanOrSleep () { val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match { case None => false case Some (cleanable) => var endOffset = cleanable.firstDirtyOffset try { val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable) recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats) endOffset = nextDirtyOffset } catch { case _: LogCleaningAbortedException => } finally { cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset) } true } val deletable: Iterable [(TopicPartition , Log )] = cleanerManager.deletableLogs() deletable.foreach { case (topicPartition, log) => try { log.deleteOldSegments() } finally { cleanerManager.doneDeleting(topicPartition) } } if (!cleaned) backOffWaitLatch.await(config.backOffMs, TimeUnit .MILLISECONDS ) }
清理操作的执行流程如下:
选取一个最需要进行日志清理的 LogToClean 对象,如果存在则执行清理操作;
如果配置了 cleanup.policy=delete
策略,则对 Log 对象中过大或过期的 LogSegment 对象执行删除操作;
如果没有需要进行清理的 LogToClean 对象,则休息一会儿后重试。
其中第 2 步与前面介绍的周期性任务 kafka-log-retention 类似,这里我们重点来看一下第 1 步,这一步的核心操作是调用 LogCleanerManager#grabFilthiestCompactedLog
方法选取下一个最需要被清理的 LogToClean 对象,然后调用 Cleaner#clean
依据该对象执行清理操作。
方法 LogCleanerManager#grabFilthiestCompactedLog
的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 def grabFilthiestCompactedLog (time: Time ): Option [LogToClean ] = { inLock(lock) { val now = time.milliseconds this .timeOfLastRun = now val lastClean = allCleanerCheckpoints val dirtyLogs = logs.filter { case (_, log) => log.config.compact }.filterNot { case (topicPartition, _) => inProgress.contains(topicPartition) }.map { case (topicPartition, log) => val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager .cleanableOffsets(log, topicPartition, lastClean, now) LogToClean (topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset) }.filter(ltc => ltc.totalBytes > 0 ) this .dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0 val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio) if (cleanableLogs.isEmpty) { None } else { val filthiest = cleanableLogs.max inProgress.put(filthiest.topicPartition, LogCleaningInProgress ) Some (filthiest) } } }
上述方法的执行流程如下:
读取每个 log 目录下 cleaner-offset-checkpoint 文件,解析每个 topic 分区上次执行清理操作对应的 offset 值;
遍历处理 Log 对象集合,筛选符合要求的 Log 对象(Log 对象配置的清理策略 cleanup.policy=compact
,且该 Log 对象当前没有正在执行清理操作),并与其需要被清理的区间一起封装成对应的 LogToClean 对象;
过滤掉不包含数据,以及待清理数据占比不超过指定阈值(对应 min.cleanable.dirty.ratio
配置)的 LogToClean 对象,并从剩下的 LogToClean 集合中选择待清理数据占比最高的 LogToClean 对象。
计算待清理区间的过程由 LogCleanerManager#cleanableOffsets
方法实现,区间值包括 dirty 部分的起始 offset 值和 uncleanable LogSegment 对象的 baseOffset 值。方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 def cleanableOffsets (log: Log , topicPartition: TopicPartition , lastClean: immutable.Map [TopicPartition , Long ], now: Long ): (Long , Long ) = { val lastCleanOffset: Option [Long ] = lastClean.get(topicPartition) val logStartOffset = log.logSegments.head.baseOffset val firstDirtyOffset = { val offset = lastCleanOffset.getOrElse(logStartOffset) if (offset < logStartOffset) { if (!isCompactAndDelete(log)) warn(s"Resetting first dirty offset to log start offset $logStartOffset since the checkpointed offset $offset is invalid." ) logStartOffset } else { offset } } val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset) val compactionLagMs = math.max(log.config.compactionLagMs, 0 L) val firstUncleanableDirtyOffset: Long = Seq ( Option (log.activeSegment.baseOffset), if (compactionLagMs > 0 ) { dirtyNonActiveSegments.find { s => val isUncleanable = s.largestTimestamp > now - compactionLagMs debug(s"Checking if log segment may be cleaned: log='${log.name} ' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp} ; now - compactionLag=${now - compactionLagMs} ; is uncleanable=$isUncleanable " ) isUncleanable } map (_.baseOffset) } else None ).flatten.min debug(s"Finding range of cleanable offsets for log=${log.name} topicPartition=$topicPartition . Last clean offset=$lastCleanOffset now=$now => firstDirtyOffset=$firstDirtyOffset firstUncleanableOffset=$firstUncleanableDirtyOffset activeSegment.baseOffset=${log.activeSegment.baseOffset} " ) (firstDirtyOffset, firstUncleanableDirtyOffset) }
清理区间的起始 offset,即 firstDirtyOffset,一般都对应着 cleaner-offset-checkpoint 文件中记录的上次执行清理操作的结束 offset,但是考虑到当前 topic 分区可能是第一次执行清理操作,或者 offset 对应的 LogSegment 可能已经被删除,所以需要将其与当前 Log 对象的首个 LogSegment 的 baseOffset 进行对比,选择较大值。
清理区间的结束 offset,即 firstUncleanableDirtyOffset,也就是 uncleanable 区间的起始 offset,我们在前面介绍了 uncleanable 区间包含 2 类 LogSegment 对象,即 activeSegment 对象和最大消息时间戳距离当前时间位于配置的滞后压缩时间范围内的 LogSegment 对象,在计算 firstUncleanableDirtyOffset 时,也就是从这两类 LogSegment 中寻找最小的 baseOffset 作为清理区间的结束 offset 值。
下面来看一下 Cleaner#clean
方法,清理操作的具体执行过程正位于此,方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private [log] def clean (cleanable: LogToClean ): (Long , CleanerStats ) = { val stats = new CleanerStats () info("Beginning cleaning of log %s." .format(cleanable.log.name)) val log = cleanable.log info("Building offset map for %s..." .format(cleanable.log.name)) val upperBoundOffset = cleanable.firstUncleanableOffset this .buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats) val endOffset = offsetMap.latestOffset + 1 stats.indexDone() val deleteHorizonMs = log.logSegments(0 , cleanable.firstDirtyOffset).lastOption match { case None => 0 L case Some (seg) => seg.lastModified - log.config.deleteRetentionMs } val cleanableHorizonMs = log.logSegments(0 , cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0 L) info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)..." .format(log.name, new Date (cleanableHorizonMs), new Date (deleteHorizonMs))) for (group <- this .groupSegmentsBySize(log.logSegments(0 , endOffset), log.config.segmentSize, log.config.maxIndexSize, cleanable.firstUncleanableOffset)) this .cleanSegments(log, group, offsetMap, deleteHorizonMs, stats) stats.bufferUtilization = offsetMap.utilization stats.allDone() (endOffset, stats) }
整个清理过程中我们重点关注一下 offsetMap 的填充过程和分组清理数据的过程,这里的 offsetMap 是一个 Kafka 自定义实现的 SkimpyOffsetMap 类型,其中主要记录了每个消息的 key 和消息在清理区间的最大 offset 值的映射关系,后面需要依据该 offsetMap 来确定需要剔除和保留的消息。填充 offsetMap 的过程位于 Cleaner#buildOffsetMap
方法中,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 private [log] def buildOffsetMap (log: Log , start: Long , end: Long , map: OffsetMap , stats: CleanerStats ) { map.clear() val dirty = log.logSegments(start, end).toBuffer info("Building offset map for log %s for %d segments in offset range [%d, %d)." .format(log.name, dirty.size, start, end)) var full = false for (segment <- dirty if !full) { this .checkDone(log.topicPartition) full = this .buildOffsetMapForSegment(log.topicPartition, segment, map, start, log.config.maxMessageSize, stats) } info("Offset map for log %s complete." .format(log.name)) } private def buildOffsetMapForSegment (topicPartition: TopicPartition , segment: LogSegment , map: OffsetMap , start: Long , maxLogMessageSize: Int , stats: CleanerStats ): Boolean = { var position = segment.index.lookup(start).position val maxDesiredMapSize = (map.slots * dupBufferLoadFactor).toInt while (position < segment.log.sizeInBytes) { this .checkDone(topicPartition) readBuffer.clear() segment.log.readInto(readBuffer, position) val records = MemoryRecords .readableRecords(readBuffer) throttler.maybeThrottle(records.sizeInBytes) val startPosition = position for (entry <- records.deepEntries.asScala) { val message = entry.record if (message.hasKey && entry.offset >= start) { if (map.size < maxDesiredMapSize) map.put(message.key, entry.offset) else return true } stats.indexMessagesRead(1 ) } val bytesRead = records.validBytes position += bytesRead stats.indexBytesRead(bytesRead) if (position == startPosition) this .growBuffers(maxLogMessageSize) } this .restoreBuffers() false }
填充的过程比较直观,上述方法会遍历清理区间的消息集合直到 offsetMap 被填满或到达区间边界为止,并在遍历过程中将持有 key 的消息及其 offset 添加到 offsetMap 中,因为消息是顺序追加的,所以能够保证 offsetMap 中记录的是当前已处理消息的对应的最大 key->offset
映射。
完成了 offsetMap 的填充,接下来方法会依据单个 LogSegment 对象和索引文件的大小上限对需要清理的 LogSegment 对象进行分组,以防止清理操作完成后生成的目标 LogSegment 对象过大或过小,保证尽量均衡。然后方法会遍历每个分组,对分组中的待清理 LogSegment 对象集合调用 Cleaner#cleanSegments
方法执行清理操作并生成最终的 LogSegment 对象替换清理操作前的 LogSegment 对象集合。方法的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 private [log] def cleanSegments (log: Log , segments: Seq [LogSegment ], map: OffsetMap , deleteHorizonMs: Long , stats: CleanerStats ) { val logFile = new File (segments.head.log.file.getPath + Log .CleanedFileSuffix ) logFile.delete() val indexFile = new File (segments.head.index.file.getPath + Log .CleanedFileSuffix ) val timeIndexFile = new File (segments.head.timeIndex.file.getPath + Log .CleanedFileSuffix ) indexFile.delete() timeIndexFile.delete() val records = FileRecords .open(logFile, false , log.initFileSize(), log.config.preallocate) val index = new OffsetIndex (indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize) val timeIndex = new TimeIndex (timeIndexFile, segments.head.baseOffset, segments.head.timeIndex.maxIndexSize) val cleaned = new LogSegment (records, index, timeIndex, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time) try { for (old <- segments) { val retainDeletes = old.lastModified > deleteHorizonMs info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes." .format(old.baseOffset, log.name, new Date (old.largestTimestamp), cleaned.baseOffset, if (retainDeletes) "retaining" else "discarding" )) this .cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats) } index.trimToValidSize() cleaned.onBecomeInactiveSegment() timeIndex.trimToValidSize() cleaned.flush() val modified = segments.last.lastModified cleaned.lastModified = modified info("Swapping in cleaned segment %d for segment(s) %s in log %s." .format(cleaned.baseOffset, segments.map(_.baseOffset).mkString("," ), log.name)) log.replaceSegments(cleaned, segments) } catch { case e: LogCleaningAbortedException => cleaned.delete() throw e } }
整个清理操作的执行流程可以概括如下:
创建存储清理后数据的 LogSegment 对象,及其对应的 log、index 和 timeindex 文件,注意此时的文件都以 “.cleaned” 作为后缀;
遍历处理待清理 LogSegment 对象集合,对每个需要清理的 LogSegment 对象调用 Cleaner#cleanInto
方法执行清理操作,并将清理后的数据写入步骤 1 中创建的 LogSegment 对象中;
对 index 和 timeindex 文件进行截断,剔除无效字节;
将存储清理后数据的 LogSegment 对象相关文件进行刷盘;
使用存储清理后数据的 LogSegment 对象替换 SkipList 中对应的被清理之前的 LogSegment 对象集合。
其中步骤 1 中创建的相关文件均以“.cleaned”作为文件名后缀,并在步骤 4 中将内存中的日志和索引数据落盘到对应文件中,而步骤 5 中除了会使用存储清理后数据的 LogSegment 对象替换 SkipList 中对应的被清理之前的 LogSegment 对象集合之外,还会将相关文件的后缀名由“.cleaned”改为“.swap”,并在完成剔除存储被清理之前数据的 LogSegment 对象集合后,移除文件的“.swap”后缀。前面我们在分析 Log#loadSegments
方法时曾说,如果当前 topic 分区目录下的 log 文件是以“.swap”作为后缀的,那么其中的数据是完整的,只是 broker 节点在执行交换(即移除“.swap”后缀)的过程中宕机了,再次加载时可以直接移除“.swap”后缀并加载,无需担心数据错乱或丢失,分析到这里应该对 broker 节点启动时加载数据文件的过程有更加深入的理解。
下面我们主要来看一下 Cleaner#cleanInto
方法的实现,分析清理操作的具体执行过程,方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 private [log] def cleanInto (topicPartition: TopicPartition , source: LogSegment , dest: LogSegment , map: OffsetMap , retainDeletes: Boolean , maxLogMessageSize: Int , stats: CleanerStats ) { val logCleanerFilter = new LogEntryFilter { def shouldRetain (logEntry: LogEntry ): Boolean = shouldRetainMessage(source, map, retainDeletes, logEntry, stats) } var position = 0 while (position < source.log.sizeInBytes) { this .checkDone(topicPartition) readBuffer.clear() writeBuffer.clear() source.log.readInto(readBuffer, position) val records = MemoryRecords .readableRecords(readBuffer) throttler.maybeThrottle(records.sizeInBytes) val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize) stats.readMessages(result.messagesRead, result.bytesRead) stats.recopyMessages(result.messagesRetained, result.bytesRetained) position += result.bytesRead val outputBuffer = result.output if (outputBuffer.position > 0 ) { outputBuffer.flip() val retained = MemoryRecords .readableRecords(outputBuffer) dest.append( firstOffset = retained.deepEntries.iterator.next().offset, largestOffset = result.maxOffset, largestTimestamp = result.maxTimestamp, shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp, records = retained) throttler.maybeThrottle(outputBuffer.limit) } if (readBuffer.limit > 0 && result.messagesRead == 0 ) growBuffers(maxLogMessageSize) } this .restoreBuffers() }
上述方法会深层遍历待清理 LogSegment 对象中的每一条消息,并调用 MemoryRecords#filterTo
对消息执行过滤操作,保留同时满足以下条件的消息:
消息必须具备 key,且 key 包含在 offsetMap 中;
消息的 offset 要大于等于 offsetMap 中记录的对应的 offset 值;
如果对应的消息是删除标记,只有在允许保留该标记是才会保留。
上述条件对应方法 Cleaner#shouldRetainMessage
实现,这里不再展开。在完成对一个消息集合的筛选操作之后,如果所有的消息均需要被保留,则只需要将消息集合写入到目标 buffer 中即可。否则,如果只有部分消息需要被保留,则需要对这部分保留的消息重新压缩(如果需要的话),然后写入目标 buffer 中。
总结
本文我们按照日志数据的组织结构由下往上分析了 LogSegment、Log 和 LogManager 组件,了解了 Kafka 的日志存储机制,其中 Log 用于存储和管理一个 topic 分区下的所有有效的消息数据,并将消息及其索引数据分片采用 LogSegment 对象进行管理。LogManager 实现了 4 个周期性任务分别用于对日志和索引数据执行定期清理、删除、刷盘,以及记录 HW 等操作,同时还维护了一个清理线程对具备相同 key 的重复消息数据进行清理,以减少对磁盘空间的无用消耗。LogManager 并没有提供对日志数据的读写操作,而是委托给相应 topic 分区的 Log 对象执行。