【发布时间】:2018-06-27 20:19:35
【问题描述】:
我了解默认 TTL 设置为无穷大(非正数)。但是,如果我们需要在存储中保留最多 2 天的数据,我们是否可以使用 RocksDBConfigSetter 接口实现进行覆盖,即 options.setWalTtlSeconds(172800)?或者它会与 Kafka 流内部冲突吗?
【问题讨论】:
标签: apache-kafka apache-kafka-streams
我了解默认 TTL 设置为无穷大(非正数)。但是,如果我们需要在存储中保留最多 2 天的数据,我们是否可以使用 RocksDBConfigSetter 接口实现进行覆盖,即 options.setWalTtlSeconds(172800)?或者它会与 Kafka 流内部冲突吗?
【问题讨论】:
标签: apache-kafka apache-kafka-streams
目前这是不可能的。由于各种技术原因,Kafka Streams 以硬编码的方式禁用了 RocksDB 的 TTL 功能。还有一张票:https://issues.apache.org/jira/browse/KAFKA-4212
目前,您可以使用窗口存储在 2 天后使旧记录过期。即,您使用 1ms 的 TimeWindow 执行 stream.groupByKey().windowedBy(...).reduce(...) 和仅返回键的最新值的“虚拟”减少。
【讨论】:
windowedBy() 函数,时间窗口为 1 小时。但是当我检查磁盘使用情况时,/tmp/kafka-streams 文件夹不断增加。 24小时从5G到20G。 RocksDB 多久会删除一次旧数据?谢谢!我的代码就像.windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO)) .reduce((event1, event2) -> event2)
reduce(..., Materialized.as(null).withRetention(...))进行配置。