【发布时间】:2018-03-06 20:30:39
【问题描述】:
我有 2 个 kafka 主题 - recommendations 和 clicks。第一个主题具有由唯一 ID(称为recommendationsId)作为键的推荐对象。每个产品都有一个用户可以点击的 URL。
clicks 主题获取通过点击推荐给用户的产品 URL 生成的消息。它已经如此设置,这些点击消息也由recommendationId 键入。
注意
推荐和点击之间的关系是一对多的。一条建议可能会导致多次点击,但一次点击始终与一条建议相关联。
每个点击对象都会有一个相应的推荐对象。
点击对象的时间戳会晚于推荐对象。
推荐和相应点击之间的差距可能是几秒到几天(比如最多 7 天)。
我的目标是使用 Kafka 流加入来加入这两个主题。我不清楚的是我应该使用 KStream x KStream 连接还是 KStream x KTable 连接。
我通过recommendations 表加入clicks 流来实现KStream x KTable 连接。但是,如果建议是在加入者启动之前生成的并且在加入者开始之后点击到达,我将看不到任何加入的点击推荐对。
我是否使用了正确的连接?我应该使用KStream x KStream 加入吗?如果是这样,为了能够加入最多过去 7 天的推荐点击,我应该将窗口大小设置为 7 天吗?在这种情况下我是否还需要设置“保留”期限?
我执行KStream x KTable 加入的代码如下。请注意,我已经定义了类Recommendations 和Click 以及它们对应的serde。点击消息只是简单的String(网址)。此 URL 字符串与 Recommendations 对象连接以创建一个 Click 对象,该对象被发送到 jointTopic。
public static void main(String[] args){
if(args.length!=4){
throw new RuntimeException("Expected 3 params: bootstraplist clickTopic recsTopic jointTopic");
}
final String booststrapList = args[0];
final String clicksTopic = args[1];
final String recsTopic = args[2];
final String jointTopic = args[3];
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my_joiner_id");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList);
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, JoinSerdes.CLICK_SERDE.getClass().getName());
KStreamBuilder builder = new KStreamBuilder();
// load clicks as KStream
KStream<String, String> clicksStream = builder.stream(Serdes.String(), Serdes.String(), clicksTopic);
// load recommendations as KTable
KTable<String, Recommendations> recsTable = builder.table(Serdes.String(), JoinSerdes.RECS_SERDE, recsTopic);
// join the two
KStream<String, Click> join = clicksStream.leftJoin(recsTable, (click, recs) -> new Click(click, recs));
// emit the join to the jointTopic
join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic);
// let the action begin
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
只要在加入者(上述程序)运行之后生成推荐和点击,就可以正常工作。但是,如果在运行加入者之前生成推荐的点击到达,我看不到任何加入发生。我该如何解决这个问题?
如果解决方案是使用KStream x KSTream加入,那么请帮助我了解我应该选择什么窗口大小以及选择什么保留期。
【问题讨论】:
标签: java join java-8 apache-kafka-streams