【问题标题】:Kafka Streams: RocksDB TTL卡夫卡流:RocksDB TTL
【发布时间】:2018-06-27 20:19:35
【问题描述】:

我了解默认 TTL 设置为无穷大(非正数)。但是,如果我们需要在存储中保留最多 2 天的数据,我们是否可以使用 RocksDBConfigSetter 接口实现进行覆盖,即 options.setWalTtlSeconds(172800)?或者它会与 Kafka 流内部冲突吗?

参考: https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config

【问题讨论】:

标签: apache-kafka apache-kafka-streams


【解决方案1】:

目前这是不可能的。由于各种技术原因,Kafka Streams 以硬编码的方式禁用了 RocksDB 的 TTL 功能。还有一张票:https://issues.apache.org/jira/browse/KAFKA-4212

目前,您可以使用窗口存储在 2 天后使旧记录过期。即,您使用 1ms 的 TimeWindow 执行 stream.groupByKey().windowedBy(...).reduce(...) 和仅返回键的最新值的“虚拟”减少。

【讨论】:

  • 嗨,马蒂亚斯,谢谢。由于 store 是通过低级处理器 API 丰富的,请建议是否有类似于上述高级 DSL 的解决方法。如果没有,我知道有一个打开的票,我计划以特定频率迭代键值存储以删除它们
  • 您还可以通过处理器 API 创建一个窗口存储,而不是普通的键值存储。
  • 嗨,马蒂亚斯,我面临着类似的问题。我的拓扑中有 windowedBy() 函数,时间窗口为 1 小时。但是当我检查磁盘使用情况时,/tmp/kafka-streams 文件夹不断增加。 24小时从5G到20G。 RocksDB 多久会删除一次旧数据?谢谢!我的代码就像.windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO)) .reduce((event1, event2) -> event2)
  • 默认保留时间为 24 小时。您可以通过reduce(..., Materialized.as(null).withRetention(...))进行配置。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-02-08
  • 2018-03-06
  • 2016-08-03
  • 2018-09-15
  • 2018-03-07
相关资源
最近更新 更多