1、日志文件布局
__consumer_offset 存储着消费者提交的消费位移
2、日志存储格式
V0
消息格式
-
offset:
每一条消息都有一个 offset 用来标志它在分区中的偏移量,offset 是逻辑值,而非实际物理偏移量 -
message_size:
表示消息的大小 -
crc32(4B):
crc32 校验值。校验范围为 magic 至 value 之间 -
magic(1B):
消息格式版本号,V0 固定为 0 -
attributes(1B):
消息的属性。总共占1个字节,低 3 位表示压缩类型。
0:NONE、1:GZIP、2:SNAPPY;3:LZ4. 其余位保留 -
key length(4B):
-1,表示没有设置 key -
key:
可选 -
value lengtj(4B):
消息体的长度,-1,表示为空 -
value:
消息体
V1
kafka 从 0.10.0 版本至 0.11.0 版本之前所使用的消息格式版本为 v1, 比 v0 多了一个 timestamp 字段
消息格式
- magic(1B):
消息格式版本号,V0 固定为 1 - attributes(1B):
前三位同样表示压缩类型。
第4位:0-timestamp 类型为CreateTime, 1-timestap 类型为LogAppendTimetimestamp类型由broker端参数log.message.timestamp.type配置,默认值为CreateTime
V2
kafka 版本从 0.11.0 之后都称为 v2.
- length
消息总长度 - attributes
弃用 - timestamp delta
时间戳增量,保存与RecordBatch起始时间戳的差值 - offset delta
位移增量,保存与RecordBatch起始位移的差值 - headers
支持应用级别的扩展 - first offset
当前RecordBatch的起始位移 - length
计算从partitionleader epoch字段开始到末尾的长度 - partitionleader epoch
分区 leader 的版本号或更新次数 - magic
V2 固定为 2 - attributes
占用 2 个字节;低三位表示压缩格式,第4位表示时间戳类型,第5位表示RecordBatch是否处于事务中,0-非事务,1-事务。第6位表示是否是控制消息,0-非控制,1-控制,控制消息用来支持事务功能 - last offset delta
RecordBatch中最后一个Record的offset与first offset的差值。最要被broker用于确保RecordBatch中Record组装的正确性 - first timestamp
RecordBatch第一条Record时间戳 - max timestamp
RecordBatch中最大时间戳 - producer id
PID,用于支持幂等和事务 - producer epoch
用于支持幂等和事务 - first sequence
用于支持幂等和事务 - record count
RecordBatch中Record个数
3、消息压缩
broker 可通过 compression.type 配置压缩方式,默认 producer,即保留生产者的压缩方式
何时压缩
kafka 可能在 producer 端或者 broker 端中进行压缩。
当消息在 broker 端重新压缩时,会造成性能下降,因此需要尽量避免。
以下会在 broker 端发生重新压缩
-
broker端和producer端不同的压缩算法。 -
broker端发生消息格式转换,即新版本消息向老版本消息转化。
何时解压缩
kafka 将多条消息放一起压缩,生产者发送的是压缩的数据,broker 存储的也是压缩的数据。消费者在拉取消息时,也是压缩的数据,在处理前才会解压消息。