【问题标题】:Does KStream filter consume every message?KStream 过滤器是否消耗每条消息?
【发布时间】:2021-10-15 07:28:10
【问题描述】:

我过去使用过 Kafka,但从未使用过流 API。我的任务是构建一个可扩展的服务,该服务接受 websocket 连接并根据用户 ID 将来自中心主题的出站消息路由到正确的会话。

使用 KStream 看起来非常简单。来自one online tutorial

builder.stream(inputTopic, Consumed.with(Serdes.String(), publicationSerde))
        .filter((name, publication) -> "George R. R. Martin".equals(publication.getName()))
        .to(outputTopic, Produced.with(Serdes.String(), publicationSerde));

但是过滤器命令是否会消耗来自主题的每条消息并在应用程序空间中执行过滤器?或者 KStream filter(Predicate super K,? super V> predicate) 是否包含进入 Kafka 内部工作的钩子,只允许它接收匹配正确键的消息?

KStream javadoc 上的措辞似乎暗示了前者:“逐条消费消息。”

如果过滤器的唯一目的是消耗主题的每条消息并丢弃不相关的消息,我可以手动完成。

【问题讨论】:

    标签: apache-kafka apache-kafka-streams spring-kafka


    【解决方案1】:

    你是对的 - 消息需要反序列化,然后根据谓词检查(在应用程序空间中)

    扔掉不相关的,我可以手动处理

    当然可以,但 Kafka Streams 有用于 defining session windows 的有用方法。此外,您无需定义消费者和生产者实例来转发新主题。

    【讨论】:

    • 另一个考虑因素:如果我的唯一目标是使用用户 ID 作为键来识别与特定 websocket 连接匹配的消息,我可以将用户 ID 添加到消息的标题中并跳过反序列化。 KStream 将需要对数据进行反序列化,这对于 CPU/内存消耗/垃圾收集来说并非微不足道。
    • 除非您手动处理反序列化,否则当客户端获取带有标头的对象的句柄时,记录仍将包含反序列化数据
    猜你喜欢
    • 1970-01-01
    • 2019-06-12
    • 1970-01-01
    • 1970-01-01
    • 2019-07-18
    • 2022-11-17
    • 2020-05-02
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多