【问题标题】:Kafka Streams: Windowing by field in record json valueKafka Streams:按记录 json 值中的字段开窗
【发布时间】:2022-01-07 08:42:54
【问题描述】:

我需要聚合一个流,它是两个其他流的连接。为此,我指定了 1 天的窗口,但我需要将存储在消息的 json 中的值用作时间戳。为流指定自己的时间戳是否现实?

//Record of stream1: {"a_id": 1, "b_id": 2}
//Record of stream2: {"b_id": 2, "timestamp": ...}

KStream<Long, JsonNode> aStream = builder
                .stream(aTopic, Consumed.with(Serdes.String(), jsonSerde))
                .selectKey((k, v) -> v.get("b_id").asLong());

KStream<Long, JsonNode> bStream = builder
                .stream(bTopic, Consumed.with(Serdes.String(), jsonSerde))
                .selectKey((k, v) -> v.get("b_id").asLong());

aStream.join(bStream, (JsonNode v1, JsonNode v2) ->
                                JsonUtils.addFieldIntoJsonNode(v1, v2.get("timestamp"), "timestamp"),
                        JoinWindows.of(Duration.ofHours(1)),
                        StreamJoined.with(Serdes.Long(), jsonSerde, jsonSerde))
.{some aggregation with windowing by that "timestamp" field}

我尝试使用时间戳提取器,但我只能在读取不适合的流时指定它,因为两个流中的连接窗口将不同。 这种情况下怎么办?

【问题讨论】:

  • 我想在一般情况下,如果没有缓存,这几乎无法解决,这扼杀了流式传输的想法。您对两个流中 b_id 的顺序和存在是否有任何限制/保证。请在问题中添加一些信息。例如,在这里阐明: 1. b_id 顺序在两个流中是否相同; 2. 如果特定的 b_id 值出现在一个流中而在另一个流中丢失,我们期望做什么; 3. 与 #2 相同,但 b_id 重复。 ...基本上我们需要压缩两个流,但一般情况下它不会工作。
  • 这两个流可以作为关系数据库中的表。在 stream2 中,所有 b_id 值都是唯一的,但在 stream1 中它们可能会重复。如果“b_id”stream2 中的某些值必须与 stream1 中的相关记录相关,则没有问题,但如果 stream1 中的某些值不存在于 stream2 中,则没有办法。顺便说一句,我使用的是内连接,所以,我认为这些约束无关紧要。
  • 两个流中 b_id 的顺序如何?我们可以依靠没有像{1,2,3,4,1},{1,3,2,4,5}这样的情况吗?

标签: java apache-kafka apache-kafka-streams windowing


【解决方案1】:

您可以编写自己的ProcessorTransformer 并在其中使用ProcessorContext。如果您的 Kafka Streams 版本足够新,您应该找到方法 ProcessorContext.&lt;K,V&gt; forward(K key, V value, To to)To 类允许使用时间戳的规范。最简单的调用是To.all().withTimestamp(123456789L)

【讨论】:

    【解决方案2】:

    您可以使用自定义时间戳提取器,您可以通过default.timestamp.extractor 配置为所有输入主题全局设置,也可以通过Consumed.with(...).withTimestampExtractor(...) 逐个主题传递。

    参见https://docs.confluent.io/platform/current/streams/developer-guide/config-streams.html#default-timestamp-extractor

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-10-31
      • 2021-09-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多