【问题标题】:Faster building of Kafka Streams state更快地构建 Kafka Streams 状态
【发布时间】:2022-03-16 21:07:37
【问题描述】:

我在 Kafka 中默认存储了 7 天的最新流数据:

log.retention.hours=168

在部署新版本的 Streams 应用程序时,需要花费大量时间来处理旧数据才能真正使用它。

除了缩短保留期之外,还有什么方法可以让它更快吗?

我想到的是,在处理完所有数据之前,不应将状态存储保存到磁盘。

【问题讨论】:

  • 保留用于非紧凑主题。状态存储存储在压缩主题中,并且可以无限期保留,因此,是的,读取它们需要很长时间,而且没有真正的好方法

标签: apache-kafka apache-kafka-streams


【解决方案1】:

我猜你的应用中有带有变更日志主题的状态存储,而需要时间的事情是恢复应用的状态?

  1. 即使输入主题设置了保留,更改日志主题也默认将cleanup.policy 设置为压缩,因此保留无限。
  2. 密钥集的大小是多少?变更日志主题包含您存储的密钥数量,您可以尝试减少此数量以获得更小的状态。
  3. 考虑更改 segment.msmin.cleanable.dirty.ratio 以优化压缩。
  4. 考虑调整rocksDB config

【讨论】:

  • 没有更改日志,只是常规状态存储(AFAIK 只应保存最后一个值)。
  • 默认情况下,每个 statestore 都由 kafka-streams 中的更改日志备份。这就是它在应用程序重新启动时能够重建 statestore 的方式。您可以使用Materialized.withLoggingDisabled() 禁用更改日志,但重启后您会丢失状态。
  • 好的,所以对我们来说理想的解决方案是将 Streams API 配置为仅处理最新的 24 小时数据,同时保留原始主题中默认的 7 天数据。但这似乎不是一个选择。
  • 有办法做到这一点。你应该阅读并理解this article
  • 好的,我可以在处理开始时添加transform() 操作,并为早于特定时间戳的记录返回null
【解决方案2】:

我最终想到的是使用 filter 在我的 Streams 应用程序中仅处理最后 N 小时的原始数据:

myStream.filter({ (_, value) =>
        val calendar = Calendar.getInstance()
        calendar.add(Calendar.HOUR, -streamHours)

        value.timestamp > calendar.getTimeInMillis
      })

【讨论】:

    猜你喜欢
    • 2022-10-06
    • 1970-01-01
    • 2019-07-03
    • 2019-09-05
    • 2018-10-05
    • 1970-01-01
    • 1970-01-01
    • 2021-10-11
    • 2020-06-02
    相关资源
    最近更新 更多