【发布时间】:2020-06-02 23:24:06
【问题描述】:
我正试图围绕 Kafka Streams 进行思考,并且有一些我自己似乎无法解决的基本问题。我了解 KTable 和 Kafka State Stores 的概念,但在决定如何处理这个问题时遇到了麻烦。我也在使用 Spring Cloud Streams,它在此之上增加了另一个级别的复杂性。
我的用例:
我有一个规则引擎,它读取 Kafka 事件、处理事件、返回匹配的规则列表并将其写入另一个主题。这是我目前所拥有的:
@Bean
public Function<KStream<String, ProcessNode>, KStream<String, List<IndicatorEvaluation>>> process() {
return input -> input.mapValues(this::analyze).filter((host, evaluation) -> evaluation != null);
}
public List<IndicatorEvaluation> analyze(final String host, final ProcessNode process) {
// Does stuff
}
一些有状态的规则如下所示:
[some condition] REPEATS 5 TIMES WITHIN 1 MINUTE
[some condition] FOLLOWEDBY [some condition] WITHIN 1 MINUTE
[rule A exists and rule B exists]
我当前的实现是将所有这些信息存储在内存中以便能够执行分析。由于显而易见的原因,它不容易扩展。所以我想我会把它保存到 Kafka State Store 中。
我不确定最好的解决方法。我知道有一种方法可以创建允许更高级别灵活性的自定义状态存储。我不确定 Kafka DSL 是否会支持这一点。
对 Kafka Streams 还是新手,不介意听到各种建议。
【问题讨论】:
标签: java apache-kafka apache-kafka-streams spring-cloud-stream