【问题标题】:Kafka stream - how to group by twice?卡夫卡流 - 如何分组两次?
【发布时间】:2018-07-06 12:43:12
【问题描述】:

我想创建一个条形图,显示图像中有多少像素颜色;图像每 3 秒更新一次,因此我的条形图也会更新。

我有一个收集 JSON 对象的主题,它的关键是图像创建日期,值是十六进制值(例如 #FFF)。

我想按键分组,所以它是按图像分组,然后按每个组的十六进制值分组并执行 .count()。

你是怎么做到的?

我在想streams.groupByKey()...然后按十六进制值分组但我需要将KTable转换为KStream...

更新

抱歉,我在手机上打字时缺乏解释。 我打算再解释一下。

顺便说一句,我改变了一些东西。如果你想阅读我在做什么,这里是我的 github:https://github.com/Lilmortal

  • 我的项目“HexGraph-source-connector”在 指定目录并将图片路径推送到主题。
  • “HexGraph”项目捡起来,使用Akka,actors会得到 所有像素十六进制代码单独并开始推送消息 转到另一个主题。
  • “HexGraph-stream”是我的 kafka 流部分。

但它很长,我怀疑你会读它lol。

无论我从某个主题中阅读,我都会收到类似 {imagePath: {hexCode: #fff}} 的消息。 图片路径是key,hexCode是value。我可以有一对多的 imagePath,所以我的想法是我的前端将有一个 websocket 来接收它。它将显示一个图像,其顶部有一个条形图,其中包含像素颜色代码的数量。例如有4个#fff、28个#fef等

因此我想按 imagePath 分组,然后我想计算该 imagePath 的每个像素。

例如:

  • {imagePath1: {hexCode: #fff, count: 47}}
  • {imagePath1: {hexCode: #fef, count: 61}}
  • {imagePath2: {hexCode: #fff, count: 23}}
  • {imagePath2: {hexCode: #fef, count: 55}}

所以这里 imagePath1 有 47 个#fff,而 imagePath2 有 23 个#fff。

这就是我想要做的。

【问题讨论】:

  • 您可以使用 KTable#toStream() 将 KTable 转换为 KStream。它不需要任何费用。但这在这里对您没有帮助,因为这样您将拥有一个聚合流,由聚合键键入,然后呢?

标签: stream apache-kafka apache-kafka-streams


【解决方案1】:

也许在分组之前通过复合键选择?像这样的:

SteamsBuilder topology = new StreamsBuilder();

topology.stream("input")
   .selectKey((k, v) -> k + v.hex)
   .groupByKey()
   .count()

这不会两次 groupBy,但会得到你想要的效果。


更新评论后:

class Image {
    public String imagePath;
}

class ImageAggregation {
    public String imagePath;
    public int count;
}

class ImageSerde implements Serde<Image> {
    // implement
}

class ImageAggregationSerde implements Serde<ImageAggregation> {
    // implement   
}

KTable<String, ImageAggregation> table = topology
  .stream("input", Consumed.with(new org.apache.kafka.common.serialization.Serdes.LongSerde(), new ImageSerde()))
  .groupBy((k, v) -> v.imagePath)
  .aggregate(ImageAggregation::new,
             (k, v, agg) -> {
                 agg.imagePath = v.imagePath;
                 agg.count = agg.count + 1;
                 return agg;
             }, Materialized.with(new org.apache.kafka.common.serialization.Serdes.StringSerde(), new ImageAggregationSerde());

更新 2 更新后:

class ImageHex {
    public String imagePath;
    public String hex;
}

class ImageHexAggregation {
    public String imagePath;
    public Map<String, Integer> counts;
}

class ImageHexSerde implements Serde<ImageHex> {
    // implement
}

class ImageHexAggregationSerde implements Serde<ImageHexAggregation> {
    // implement   
}

KTable<String, ImageHexAggregation> table = topology
  .stream("image-hex-observations", Consumed.with(new org.apache.kafka.common.serialization.Serdes.LongSerde(), new ImageSerde()))
  .groupBy((k, v) -> v.imagePath)
  .aggregate(ImageHexAggregation::new,
             (k, v, agg) -> {
                 agg.imagePath = v.imagePath;
                 Integer currentCount = agg.counts.getOrDefault(v.hex, 0)
                 agg.counts.put(v.hex, currentCount + 1));
                 return agg;
             }, Materialized.with(new org.apache.kafka.common.serialization.Serdes.StringSerde(), new ImageHexAggregationSerde());

【讨论】:

  • 您好,抱歉回复晚了。我在想:有一个包含诸如“{imagePath:#fff}”、“{imagePath:#efe}”之类的消息的 KStream。还有一个具有“{imagePath: (#fff, 13)}”的 KTable。当消息 {imagePath: #fff} 到达时,KTable 将更新为 {imagePath: (#fff, 14)}"。
  • 你的意思是这就是你想要的,对吧? KStream 变成了那个 KTable,并且计数是这样递增的吗?键是您要分组的时间戳?
  • 啊,是的,解决方案正在取得进展!我现在正在玩它,谢谢德米特里!但这计算了总共有多少 hexCodes,我需要计算每个图像。我详细更新了我的问题,希望对您有所帮助。非常感谢您坚持不懈!我真的很感激。
  • 嗯,我无处可去。我能想到的唯一解决方案是 ImageAggregation 有一个十六进制代码的哈希表作为键,计数作为每个十六进制代码的值。所以我将向我的最终主题发送消息:{imagePath: {#fff: 300, #fef: 454}}。这并不理想......我更喜欢一次发送一个十六进制代码。
  • 哦等等!我的反序列化中有一个错误!我要对此进行更多测试,它可能会起作用!
猜你喜欢
  • 2020-06-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-05-16
  • 2017-02-08
  • 2018-03-06
  • 2016-08-03
  • 2018-09-15
相关资源
最近更新 更多