【问题标题】:Kafka streams foreign key join with one-many relations卡夫卡流外键加入一对多关系
【发布时间】:2021-12-06 05:35:46
【问题描述】:

有两个kafka主题

  • 新闻
  • 图片

新闻主题中的消息可以有如下图片ID列表

{ 

    "id": "news-1",
    "title": "Title news-1",
    "description": " description news-1",
    "author": " Author news-1",
    "imageIds": [
        "images-1",
        "images-2"
    ]
}

图片主题中的消息如下所示

{

    "id": "image-1",
    "url": "https://www.mypublication.co.uk/image-title-1.jpeg",
    "description": "title-1 description",
    "height": 400,
    "width": 450
}


{

     "id": "image-2",
     "url": "https://www.mypublication.co.uk/image-title-2.jpeg",
     "description": "title-2 description",
     "height": 400,
     "width": 450
 }

我正在尝试加入这两个流以填充包含所有图像详细信息的最终新闻消息。

我尝试使用 groupByaggregate 如下

 KTable<String, Image> images = builder.table(topics.getImagesTopic(), Consumed.with(Serdes.String(), imageSerde));
    KStream<String, News> news = builder.stream(topics.getNewsTopic(), Consumed.with(Serdes.String(), newsSerde));

KTable<String, NewsImages> newsImagesKTable = news.flatMapValues(newsArticle -> newsArticle.getImageIds())
      .map((newsId, imageId) -> new KeyValue<>(imageId, newsId)) // rekey not good !!?
      .join(images, (newsId, image) -> {
        return new ImageWrapper(newsId, image);
      }, Joined.with(Serdes.String(), Serdes.String(), imageSerde))
            .groupBy((imageId, imageWrapper) -> imageWrapper.getNewsId(), Grouped.with(Serdes.String(), imageWrapperSerde))
      .aggregate(NewsImages::new, (newsId, image, newsImages) -> {
        newsImages.setNewsId(newsId);
        newsImages.addImage(image);
        return newsImages;
      }, Materialized.with(Serdes.String(),newsImagesSerde));

newsImagesKTable.toStream().
      to(topics.getNewsImagesTopic());

但正如预期的那样,上面的代码聚合了新闻的所有图像

当作者第一次用两张图片发布新闻时,一切顺利,我们可以看到下面的输出

 "news-1" :
     {
       "newsId":"news-1", 
"images":
    {"image-1":{"id":"image-1","url":"https://www.mypublication.co.uk/image-1.jpeg","description":"title-1 description","height":400,"width":450},
        "image-2":{"id":"image-2","url":"https://www.mypublication.co.uk/image-2.jpeg","description":"title-2 description","height":400,"width":450}}
                }

当作者重新发布只有 image-3 的文章时,它正在输出所有三个图像(这就是聚合器) news-1 : [image-1, image-2, image-3]

我正在寻找任何其他替代方法来加入新闻和图像并在重新发布新闻时覆盖以前的值 新闻-1:[image-3]

【问题讨论】:

    标签: apache-kafka java-stream apache-kafka-streams confluent-platform confluent-cloud


    【解决方案1】:
    猜你喜欢
    • 2018-03-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-08
    • 2016-08-03
    • 2018-09-15
    相关资源
    最近更新 更多