【问题标题】:Persisting state into Kafka using Kafka Streams使用 Kafka Streams 将状态持久化到 Kafka
【发布时间】:2020-06-02 23:24:06
【问题描述】:

我正试图围绕 Kafka Streams 进行思考,并且有一些我自己似乎无法解决的基本问题。我了解 KTable 和 Kafka State Stores 的概念,但在决定如何处理这个问题时遇到了麻烦。我也在使用 Spring Cloud Streams,它在此之上增加了另一个级别的复杂性。

我的用例:

我有一个规则引擎,它读取 Kafka 事件、处理事件、返回匹配的规则列表并将其写入另一个主题。这是我目前所拥有的:

@Bean
public Function<KStream<String, ProcessNode>, KStream<String, List<IndicatorEvaluation>>> process() {
    return input -> input.mapValues(this::analyze).filter((host, evaluation) -> evaluation != null);
}

public List<IndicatorEvaluation> analyze(final String host, final ProcessNode process) {
    // Does stuff
}

一些有状态的规则如下所示:

[some condition] REPEATS 5 TIMES WITHIN 1 MINUTE
[some condition] FOLLOWEDBY [some condition] WITHIN 1 MINUTE
[rule A exists and rule B exists]

我当前的实现是将所有这些信息存储在内存中以便能够执行分析。由于显而易见的原因,它不容易扩展。所以我想我会把它保存到 Kafka State Store 中。

我不确定最好的解决方法。我知道有一种方法可以创建允许更高级别灵活性的自定义状态存储。我不确定 Kafka DSL 是否会支持这一点。

对 Kafka Streams 还是新手,不介意听到各种建议。

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams spring-cloud-stream


    【解决方案1】:

    根据您给出的描述,我相信这个用例仍然可以使用 Kafka Streams 中的 DSL 来实现。您上面显示的代码不跟踪任何状态。在您的拓扑中,您​​需要通过跟踪规则的计数来添加状态并将它们存储在状态存储中。然后,您只需在该计数达到阈值时发送输出规则。这是作为伪代码背后的一般思想。显然,您必须对其进行调整以满足用例的特定规范。

    @Bean
    public Function<KStream<String, ProcessNode>, KStream<String, List<IndicatorEvaluation>>> process() {
        return input -> input
                         .mapValues(this::analyze)
                         .filter((host, evaluation) -> evaluation != null)
                         ...
                         .groupByKey(...)
                         .windowedBy(TimeWindows.of(Duration.ofHours(1)))
                         .count(Materialized.as("rules"))
                         .filter((key, value) -> value > 4)
                         .toStream()
                        ....
    }
    

    【讨论】:

    • 投掷另一个曲线球,因为这将是一个场景。规则过期的持续时间是用户定义的,并且可能会根据规则进行更改。我更新了 OP 以反映一些示例。
    • 对于这种复杂的情况,您可能需要低级处理器 API。如果您有一个小示例,其中有一些代码要演示,我们可以进一步对其进行分类。
    猜你喜欢
    • 2023-03-23
    • 1970-01-01
    • 2022-10-06
    • 2019-07-14
    • 2020-01-20
    • 1970-01-01
    • 2019-07-03
    • 2019-09-05
    • 2018-10-05
    相关资源
    最近更新 更多