【问题标题】:Kafka not deleting key with tombstone卡夫卡没有用墓碑删除密钥
【发布时间】:2018-03-19 20:27:24
【问题描述】:

我使用以下属性创建了一个 kafka 主题

min.cleanable.dirty.ratio=0.01,delete.retention.ms=100,segment.ms=100,cleanup.policy=compact

假设我按顺序插入 k-v 对 1111:1、1111:2、1111:null、2222:1 现在发生的是除了最后一条消息,日志压缩在其余消息上运行并清除前两条但保留 1111:null

根据文档,

Kafka log compaction also allows for deletes. A message with a key and a null payload acts like a tombstone, a delete marker for that key. Tombstones get cleared after a period.

所以,我希望当 delete.retention.ms 实现时,空标记应该删除带有键 1111

的消息

我有两个问题 - 为什么墓碑标记不起作用?为什么最后一条消息从压缩中被忽略?

这就是 server.properties 文件的内容 -

log.retention.ms=100
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=100
log.cleaner.delete.retention.ms=100
log.cleaner.enable=true
log.cleaner.min.cleanable.ratio=0.01

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    按照设计,墓碑记录的保存时间更长。原因是经纪人不跟踪消费者。假设消费者在阅读第一条记录后离线了一段时间。当消费者关闭时,日志压缩开始了。如果日志压缩会删除墓碑记录,那么消费者将永远不会知道该记录已被删除的事实。如果消费者实现了缓存,则记录可能永远不会被删除。因此,tombstone 会保留更长时间,以允许离线消费者接收所有 tombstone 以进行本地清理。

    只有在delete.retention.ms 之后才会删除墓碑(默认值为 1 天)。注意:这是一个主题级别的配置,没有代理级别的配置。因此,如果你想改变它,你需要为每个主题设置配置。

    【讨论】:

    • 我在帖子中描述了该主题是使用 delete.retention.ms=100 创建的,这意味着在发送后 100 毫秒后将清除墓碑标记键。一个问题:它可以与 compact.policy 设置有关吗?我在某处读到我们需要将其设置为紧凑,删除以启用删除。
    • 如果启用“compact,delete”,您基本上会在顶部获得一个 TTL,并且保留时间较早的记录将被删除(即使没有墓碑)。参照。 cwiki.apache.org/confluence/display/KAFKA/… 你能仔细检查一下主题配置,它真的配置了 delete.retention.ms=100 吗?
    • 我在该主题上运行了 --describe,输出为 Configs:min.cleanable.dirty.ratio=0.01,delete.retention.ms=100,cleanup.policy=compact,segment.ms= 100
    • 在您的情况下,使 delete.retention.ms=0 应该删除墓碑。详情请参阅我的回答。
    • 如果缓存只有读到tombstone才删除,如果consumer由于某种原因长时间离线,写入tombstone并删除记录,也会立即删除tombstone ,消费者将在缓存中有一个“悬空”条目,因为它永远不会看到墓碑。
    【解决方案2】:

    压缩主题有两部分:

    1) 清洁部分:kafka 原木的一部分被 kafka 清洁器至少清洁一次。

    2) 脏部分:kafka 原木的一部分直到现在都没有被 kafka 清洁器清洁一次。 Kafka 维护脏偏移。所有偏移量 >= 脏偏移量的消息都属于脏部分。

    注意:Kafka 清理器会清理所有段(无论段是否处于清理/脏部分)并在每次脏比达到 min.cleanable.dirty.ratio 时重新复制它们。

    墓碑被逐段删除。如果段满足以下条件,则删除段中的墓碑:

    1. 段应该在日志的清理部分。

    2. 段的最后修改时间应为

    很难详细说明第二点,但简单来说,第二点意味着 => 段大小应等于 log.segment.bytes/segment.bytes(默认为 1GB)。要使段大小(更干净的部分)等于 1GB,您需要生成大量具有不同键的消息。但是您只生成了 4 条消息,其中 3 条消息具有相同的密钥。这就是为什么在包含 1111:null 消息的段中没有删除墓碑的原因(段不满足我上面提到的第二点)。

    您有两个选项来删除带有 4 条消息的墓碑:

    1. 使 delete.retention.ms=0 或
    2. 使 log.segment.bytes/segment.bytes=50。

    源代码(补充阅读): https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogCleaner.scala

    try {
          // clean segments into the new destination segment
          for (old <- segments) {
            val retainDeletes = old.lastModified > deleteHorizonMs
            info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
                .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
            cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats)
          }
    

    【讨论】:

      【解决方案3】:

      在压缩文件中删除墓碑的算法应该如下。

      1. 当墓碑仍在日志的脏部分中时,它永远不会被删除。
      2. 在 tombstone 位于日志的已清理部分后,我们通过 delete.retention.ms 进一步延迟 tombstone 的删除,因为 tombstone 位于已清理部分中。

      墓碑可能仍在日志的脏部分中,因此未清除。触发更多不同键的消息应该会将墓碑推入日志的已清理部分并删除它们。

      【讨论】:

      • 我已经验证了@yatishby 的解决方案,它有效。首先,下载kcat,然后使用此命令echo "i:1" | kcat -b localhost:9092 -t test -Z -K: 向Kafka 代理生成多条消息。它将墓碑推到日志的尾部并触发墓碑的移除。
      【解决方案4】:

      从compaction的角度来看,kafka log包含三个部分:

      1. 活动段,新记录将附加到该段。
      2. 日志头,尚未压缩。
      3. 日志尾部,已压缩或正在压缩。

      卡夫卡documentation:

      请注意,此压缩截止日期为not a hard guarantee,因为它仍受日志清理线程的可用性和实际压缩时间的影响。您需要监控 uncleanable-partitions-count、max-clean-time-secs 和 max-compaction-delay-secs 指标。

      正如@yatishby 指出的那样,如果你的墓碑在活动段或日志头中,则不会触发压缩。

      因此,我尝试将数十条消息刷新到主题,并将墓碑从日志头移到日志尾。

      然后,它起作用了,墓碑已成功移除。

      这里是重现它的步骤。

      Step1:发送墓碑到Kafka

      我创建了一个与您配置相同的测试主题。然后将墓碑发送给卡夫卡。如您所见,即使指定了delete.retention.ms=100,墓碑仍保留在那里。好像不行。

      Step2:将相同的消息刷新到 Kafka。

      # do it 20 times.
      echo "g:1" | kcat -b localhost:9092 -t test -Z -K:
      echo "j:1" | kcat -b localhost:9092 -t test -Z -K:
      

      Kcat 是一个非常方便的工具,可以向 kafka 生成消息。

      Step3:墓碑消失了。

      备忘单

      1. 如何使用kcat 为 Kafka 生成墓碑?

        echo "yourkey:" | kcat -b localhost:9092 -t test -Z -K:
        
      2. 如何消费来自主题的消息?

        kafka-console-consumer --bootstrap-server localhost:9092 --topic test  \
        --from-beginning \                             
        --formatter kafka.tools.DefaultMessageFormatter \
        --property print.timestamp=true \
        --property print.key=true \
        --property print.value=true
        

      【讨论】:

        猜你喜欢
        • 2017-04-09
        • 2018-09-06
        • 2015-06-04
        • 2014-10-16
        • 2019-06-24
        • 2019-06-18
        • 2013-01-14
        • 2021-10-27
        • 1970-01-01
        相关资源
        最近更新 更多