【问题标题】:Kafka Streams: Is it possible to have "compact,delete" policy on state stores?Kafka Streams:是否可以对状态存储制定“压缩、删除”政策?
【发布时间】:2018-11-10 08:28:10
【问题描述】:

Kafka Streams 状态存储默认是“紧凑的”。是否可以在状态存储中使用保留策略设置“压缩,删除”?

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    是的,可以使用保留和压缩配置主题,Kafka Streams 将此设置用于窗口化KTables。

    如果你真的要设置,可以在创建后手动更新对应的changelog topic config。

    但是,为更改日志主题设置主题保留时间只会从主题中删除数据。本地状态存储中的数据未删除。状态存储不提供 TTL,并且 RocksDBs 的 TTL 设置无法启用(出于技术原因,我们希望最终解决)。

    如果你想彻底删除数据,你应该使用 tombstone 消息从存储中删除数据以及更改日志主题(而不是使用保留时间)。

    【讨论】:

    • 所以,您可以将 tombstone 消息发送到 statestore 更改日志主题,然后“实时” statestore 将了解它们并将它们应用于 in-mem 和 RocksDB 表示?
    • 没有。您永远不应该写更改日志主题!这会弄乱你的状态。如果您将主题直接读取为表格 (builder.table(...)),则将墓碑写入输入主题。如果你的表是聚合的结果,你的聚合器需要返回null作为聚合结果;这将从存储中删除密钥并将墓碑写入更改日志。
    • 在 PAPI 中思考,KVStore 的 store.delete(Key) 和 store.remove(WindowedKey) 将处理所有这些,那么?
    • 在窗口存储的情况下,我想删除所有需要 fetch(key) 的条目并遍历迭代器,分别删除每个 WindowedKey 条目,对吧?
    • 这是可能的。请注意,窗口存储应用保留时间。这可能已经提供了您需要的内容,而无需通过自定义代码删除旧窗口。
    【解决方案2】:

    如果您使用默认的 RocksDBStore,则可以选择将 CompactionStyle 设置为 FIFO:

    FIFO 压缩风格是最简单的压缩策略。它适用于以非常低的开销保存事件日志数据(例如查询日志)。它会定期删除旧数据,因此它基本上是一种 TTL 压缩样式。

    然后使用 TTL:

    为此引入了一个新选项 compaction_options_fifo.ttl,以删除 TTL 已过期的 SST 文件。此功能使用户能够根据时间而不是始终根据大小删除文件,例如,删除所有超过一周或一个月的 SST 文件。

    RocksDB FIFO doc

    要实际设置 FIFO,您必须实现 RocksDBConfigSetter 并将其设置为配置属性:rocksdb.config.setter

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-01-04
      • 2017-12-18
      • 2023-01-18
      • 2019-07-03
      • 1970-01-01
      • 2018-10-24
      • 2021-10-11
      相关资源
      最近更新 更多