Broker日志持久化
日志原理介绍
在讲
Kafka
日志源码之前,我们要先对 Kafka
日志有一个大体的认识这也是阅读源码的关键,一步一步来
前面我们聊到了
Kafka
的生产端的整体架构可以看到,我们每一个
Topic
都可以分为多个 Partition
,而每一个 Partition
对应着一个 Log
但这里会存在两个问题,如果我们的数据过大
- 一个
Log
能装下吗?
- 就算能装下,插入/查询速度怎么保证?
所以,
Kafka
在这里引入了日志分段(LogSegment
)的概念,将一个 Log
切割成多个 LogSegment
进行存储实际上,这里的
Log
和 LogSegment
并不是纯粹的物理意义上的概念Log
对应的文件夹
LogSegment
对应磁盘上的一个日志文件和两个索引文件- 日志文件:以
.log
为文件后缀 - 两个索引文件:
- 偏移量索引文件(以
.index
为文件后缀) - 时间戳索引文件(以
.timeindex
为文件后缀)
这里有个重点要记一下:每个
LogSegment
都有一个基准偏移量 baseOffset
,用来表示当前 LogSegment
第一条消息的 offset
日志和索引文件命名都是按照基准偏移量进行命名,所以日志整体架构如下:
这里我们简单介绍下这个日志是怎么搜索的,后面会深入源码细聊
二、日志源码
我们回顾一下上篇文章的整体流程图:
我们可以看到,消息的处理是通过
KafkaApis
来进行的,日志持久化通过 case ApiKeys.PRODUCE => handleProduceRequest(request)
本篇我们也围绕这个方法展开
1、授权校验
2、消息添加
- 【重点】timeout:超时时间
- 【重点】requiredAcks:指定了在记录追加到副本后需要多少个副本进行确认,才认为写操作成功
0
: 不需要任何副本的确认1
: 只需要主副本确认1
或all
: 需要所有副本的确认
internalTopicsAllowed
:是否允许将记录追加到内部主题
isFromClient
:请求是否来自客户端
- 【重点】
entriesPerPartition
:包含了通过授权验证的主题分区和对应的内存记录
responseCallback
:回调函数,在记录追加完成后,会调用该回调函数发送响应给客户端。
recordConversionStatsCallback
:处理记录转换统计信息的逻辑
我们主要关心这三个参数即可:
timeout
、requiredAcks
、entriesPerPartition
,其余的目前不太重要这里的追加本地日志就是我们本篇的重点
2.1 获取 Partition
2.2 向 Leader 追加日志
2.2.1 是否创建 segment
这里就到了我们一开始图中的
LogSegment
一共有六个条件,触发这六个条件,就会重新创建一个
segment
timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs
:判断时间等待是不是超时
size > rollParams.maxSegmentBytes - rollParams.messagesSize
:当前segment
是否有充足的空间存储当前信息
size > 0 && reachedRollMs
:当前日志段的大小大于0,并且达到了进行日志分段的时间条件reachedRollMs
offsetIndex.isFull
:偏移索引满了
timeIndex.isFull
:时间戳索引满了
!canConvertToRelativeOffset(rollParams.maxOffsetInMessages)
:无法进行相对偏移的转换操作
整体来看,六个条件也比较简单,我们继续往后看
2.2.2 创建 segment
2.2.2.1 文件路径校验
2.2.2.2 segment 参数
- dir:日志段所在的目录
- baseOffset:日志段的基准偏移量
- config:日志的配置信息
- time:时间对象,用于处理时间相关的操作。
- fileAlreadyExists:指示日志文件是否已经存在
- initFileSize:初始文件大小
- preallocate:是否预分配文件空间
- fileSuffix:文件后缀
2.2.2.3 生成 segment
这里有一个重点需要关注一下,那就是
mmap
的零拷贝OffsetIndex
和 TimeIndex
他们继承 AbstractIndex
,而 AbstractIndex
中使用 mmp
作为 buffer
另外,这里先提一个知识点,后面会专门写一篇文章来分析一下
我们索引在查询的时候,采用的是二分查找的方式,这会导致 缺页中断,于是
kafka
将二分查找进行改进,将索引区分为 冷区 和 热区,分别搜索,尽可能保证热区的页在 Page Cache
里面,从而避免缺页中断。当我们的
segment
生成完之后,就返回了2.2.3 向 segment 添加日志
2.2.3.1 稀疏索引
kafka
中的偏移量索引和时间戳索引都属于稀疏索引何为稀疏索引?
正常来说,我们会为每一个日志都创建一个索引,比如:
但这种方式比较浪费,于是采用稀疏索引,如下:
当我们根据偏移量索引查询
1
时,可以查询到日志为 1
的,然后往下遍历搜索想要的即可。2.2.3.2 偏移量索引
2.2.3.3 时间戳索引
2.2.3.4 索引总结
我们的偏移量索引如图下所示:
- 当我们查询一个消息时,比如消息位移为
23
的 - 根据二分查找找到偏移量索引下标
22
- 利用上述我们偏移量
Map
的存储,得到其日志位置RecordBatch:firstOffset=23 position=762
- 再根据日志位置,找到真正存储日志的地方
我们的时间戳索引如图下所示:
- 基本和我们的偏移量索引类似,只是增加了一层二分查找
2.2.4 flush刷新
在我们前面添加完之后,我们的数据仅仅是写到
PageCache
里面,需要进行 flush
将其刷新到磁盘中2.3 Follow 获取日志
同样,我们的
Follow
在获取日志时,和我们 Leader
添加日志时一样的方法三、流程图
Loading...