【问题标题】:Interactive Conditioning For Kafka StreamKafka Stream 的交互式调节
【发布时间】:2019-08-06 14:09:09
【问题描述】:

我想为我的 kafka 流创建交互条件。我只是想知道有没有可能。

示例用例如下:

我的 kafka 主题上有用户点击事件。用户在点击选项表中为自己定义了最小点击次数,我想在他们达到最小点击次数时通知他们。 Kstream 根据限制过滤点击计数。 Eventlistener 消费由 kstream 输出产生的主题数据并向用户发送通知。

如何根据用户的持久数据逐个用户定义 Kstream 过滤条件?当持久性数据发生变化时我可以改变它吗?

【问题讨论】:

    标签: java spring spring-boot apache-kafka apache-kafka-streams


    【解决方案1】:

    你需要创建两个主题:

    • user-prefs - 带有用户偏好,其中键是用户 ID,值是最小点击次数。 (好的做法是将其压缩)
    • clicks - 发送原始点击的主题,键是用户 ID,值不重要(假设一些字符串)

    使用 KafkaProducer,您将用户偏好(点击次数最少)发送到 user-prefs,如果他们会改变您需要发送新消息 用户点击转到clicks主题。

    假设您想将它们聚合一段时间(60 秒)。

    首先,您必须对点击次数进行分组和汇总,然后发送最终结果。 之后,您使用user-prefs 加入最终结果,其中保留了最小点击次数。过滤器是根据聚合点击次数和最小点击次数进行的

    KStream<String, Long> clicks = builder.<String, String>stream("clicks")
        .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(60)).grace(Duration.ofSeconds(1)))
        .count(Materialized.with(Serdes.String(), Serdes.Long()))
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
        .toStream().map((key, value) -> new KeyValue<>(key.key(), value));
    
    KTable<String, Long> userPrefs = builder.<String, Long>table(
        "user-prefs",
        Consumed.with(Serdes.String(), Serdes.Long())
    );
    
    clicks.join(
        userPrefs,
        (userClicks, minUserClicksNumber) -> userClicks >= minUserClicksNumber,
        Joined.with(Serdes.String(), Serdes.Long(), Serdes.Long())
    )
        .filter((userName, isSufficientNumberOfClick) -> isSufficientNumberOfClick)
        .map(((key, value) -> new KeyValue<>(key, key)))
        .to("output", Produced.with(Serdes.String(), Serdes.String()));
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-09-19
      • 1970-01-01
      • 2019-06-05
      • 1970-01-01
      • 2020-01-14
      • 1970-01-01
      相关资源
      最近更新 更多