【发布时间】:2022-01-07 08:42:54
【问题描述】:
我需要聚合一个流,它是两个其他流的连接。为此,我指定了 1 天的窗口,但我需要将存储在消息的 json 中的值用作时间戳。为流指定自己的时间戳是否现实?
//Record of stream1: {"a_id": 1, "b_id": 2}
//Record of stream2: {"b_id": 2, "timestamp": ...}
KStream<Long, JsonNode> aStream = builder
.stream(aTopic, Consumed.with(Serdes.String(), jsonSerde))
.selectKey((k, v) -> v.get("b_id").asLong());
KStream<Long, JsonNode> bStream = builder
.stream(bTopic, Consumed.with(Serdes.String(), jsonSerde))
.selectKey((k, v) -> v.get("b_id").asLong());
aStream.join(bStream, (JsonNode v1, JsonNode v2) ->
JsonUtils.addFieldIntoJsonNode(v1, v2.get("timestamp"), "timestamp"),
JoinWindows.of(Duration.ofHours(1)),
StreamJoined.with(Serdes.Long(), jsonSerde, jsonSerde))
.{some aggregation with windowing by that "timestamp" field}
我尝试使用时间戳提取器,但我只能在读取不适合的流时指定它,因为两个流中的连接窗口将不同。 这种情况下怎么办?
【问题讨论】:
-
我想在一般情况下,如果没有缓存,这几乎无法解决,这扼杀了流式传输的想法。您对两个流中 b_id 的顺序和存在是否有任何限制/保证。请在问题中添加一些信息。例如,在这里阐明: 1. b_id 顺序在两个流中是否相同; 2. 如果特定的 b_id 值出现在一个流中而在另一个流中丢失,我们期望做什么; 3. 与 #2 相同,但 b_id 重复。 ...基本上我们需要压缩两个流,但一般情况下它不会工作。
-
这两个流可以作为关系数据库中的表。在 stream2 中,所有 b_id 值都是唯一的,但在 stream1 中它们可能会重复。如果“b_id”stream2 中的某些值必须与 stream1 中的相关记录相关,则没有问题,但如果 stream1 中的某些值不存在于 stream2 中,则没有办法。顺便说一句,我使用的是内连接,所以,我认为这些约束无关紧要。
-
两个流中 b_id 的顺序如何?我们可以依靠没有像{1,2,3,4,1},{1,3,2,4,5}这样的情况吗?
标签: java apache-kafka apache-kafka-streams windowing