【发布时间】:2019-09-26 06:55:18
【问题描述】:
我们正在尝试实现下面描述的用例,我们遇到了我们希望克服的实现问题,
用例,
我们正在尝试通过匹配两个流的消息(JSON)中存在的 KEY 来在 2 个 Kafka 主题之间进行 KStream 连接。 我们还应该维护消息序列,因为它是从源到达 KStream 的。
场景是, 如果 Matching Key 尚未到达任何一个流中,我们应该停止或重试加入,直到预期的 key 到达其他主题。 我们曾考虑将不匹配的记录放回 KStream,但在这种情况下无法保证顺序。
问题 1: 如何停止或保持连接,直到预期的键到达其他主题。 例如,KTable 有 Key 100,但是 KStream 还没有收到 Key 100 那么我们应该重试 Join 或保持 KStream 直到 Key 100 到达。
问题 2: 有没有办法在KStream(Delayed KStream)中放置延迟或间隔来接收延迟时间或间隔的消息。
此外,我们必须从非键控主题构建键控 KStream(将通过从消息 - JSON 中提取来设置键)
Java 更可取,因为我们在 KTable 和 KStream 之间进行了 POC 连接
KTable<String, String> leftStream = builder.table("stream1");
KStream<String, String> rightStream = builder.stream("stream2");
KStream<String, String> outstream = rightStream.leftJoin(leftStream, (orig_msg, description) -> {
String new_msg = "";
if (description != null) {
new_msg = orig_msg+"-->Matched--"+description;
}else {
new_msg = orig_msg+"-->UnMatched<--"+description;
}
return new_msg;
});
【问题讨论】:
标签: apache-kafka