【发布时间】:2019-08-25 07:23:14
【问题描述】:
我想使用 Kafka Streams Processor API 并在预定的 punctuator function 中每分钟生成一些消息。 Kafka Streams 能否保证这些消息只写入输出主题一次?
我知道在 Kafka Streams 中可以进行一次性处理,因为它通过以下操作进行单个事务:
- 向输入主题提交偏移量
- 将结果写入输出主题
这个概念是否扩展到处理器 API 中的标点符号函数,没有相关的输入消息需要提交?
例如,此标点符号函数迭代 key value state store 中的项目。每个项目都从存储中删除并转发到下游:
override def punctuate(timestamp: Long) : Unit =
store.all.asScala.foreach { keyValue =>
store.delete(keyValue.key)
context.forward(keyValue.key, keyValue.value)
}
商店中的每条消息都应该在输出主题上出现一次,即使在处理器发生故障并重新启动的情况下也是如此。
假设存储是持久的;它由 kafka 更改日志主题支持。标点符号按每分钟挂钟时间安排。我在我的配置中配置了processing.guarantee=exactly_once。
【问题讨论】:
标签: apache-kafka apache-kafka-streams