【问题标题】:How to combine different messages to 1 output message with KStreams?如何使用 KStreams 将不同的消息组合成 1 条输出消息?
【发布时间】:2021-08-17 22:54:47
【问题描述】:

我对卡夫卡流世界真的很陌生,并且在这方面只有初学者水平的知识。 在我们当前的应用程序中,我们接收来自 Kafka 主题的 json 消息,格式如下:

消息1

{
  "projectId": "Project1",
  "userId": "User1",
  "actions": [
    {
      "APP_MESSAGE_ID": "27",
      "category": "FA"
    }
  ]
}

消息2

{
  "projectId": "Project1",
  "userId": "User1",
  "actions": [
    {
      "APP_MESSAGE_ID": "28",
      "category": "FA"
    }
  ]
}

消息3

{
  "projectId": "Project2",
  "userId": "User1",
  "actions": [
    {
      "APP_MESSAGE_ID": "29",
      "category": "PD"
    }
  ]
}

现在我们想以这种格式将其推送到目标主题:

所需目标输出消息

[
  {
    "projectId": "Project1",
    "data": [
      {
        "APP_MESSAGE_ID": "27",
        "category": "FA"
      },
      {
        "APP_MESSAGE_ID": "28",
        "category": "FA"
      }
    ]
  },
  {
    "projectId": "Project2",
    "data": [
      {
        "APP_MESSAGE_ID": "29",
        "category": "PD"
      }
    ]
  }
]

我已经设法为我的输入主题写了如下

KStreamBuilder builder = new KStreamBuilder();
Pattern pattern = Pattern.compile("Input-Message.*");
KStream<String, Object> sourceStream = builder.stream(pattern);
KTable aggregatedTable =
            sourceStream
                .filter((k, v) -> v != null)
                .mapValues(v -> getAMModelFromMessage(v))
                .groupBy((k, v) -> KeyValue.pair(v != null ? v.getProjectId() : null, v))
                .windowedBy(TimeWindows.of(Duration.ofSeconds(30)))
                .aggregate(ArrayList::new, (k, v, list) -> {
                        list.add(v);
                        return list;
                    }
                    , Materialized.as("NewStore").withValueSerde(new ArrayListSerde(Serdes.String()))
                        .withKeySerde(Serdes.String()));

KStream<Object, ArrayList<AMKafkaMessage>> outputStream = aggregatedTable.toStream();

        

outputStream.to("destination_topic");

根据这个问题,我已经实现(使用)ArrayListSerde、ArrayListSerializer 和 ArrayListDeserializer 类: Issue with ArrayList Serde in Kafka Streams API

AMKafkaMessage 是我的模型类如下

import lombok.Data;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;

@Data
@JsonIgnoreProperties
public class AMKafkaMessage {

    private String projectId;
    private String userId;
}

并写在下面的方法中,通过从 json 解析它来获取它的实例

private AMKafkaMessage getAMModelFromMessage(Object message) {
        try {
            Gson gson = new Gson();
            
            AMKafkaMessage amKafkaMessage = gson.fromJson(message,
                AMKafkaMessage.class);
            return amKafkaMessage;
        } catch (Exception e) {
            logger.error("Exception occurred while decrypting message: {}", e.toString());
        }
        return null;
    }

并且想写到名为“destination_topic”的主题

"destination_topic"

任何帮助实现这一点将不胜感激。 另外,请让我知道我是否在做(或思考)关于如何使用流的错误方式。 提前致谢。

上面的代码给了我例外

   org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic kafka-stream-NewStore-repartition. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: org.apache.kafka.streams.KeyValue / value type: com.admin.model.kafkaModel.AMKafkaMessage). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:152) ~[kafka-streams-2.6.2.jar:?]
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:129) ~[kafka-streams-2.6.2.jar:?]

在这里失败

【问题讨论】:

    标签: kafka-consumer-api apache-kafka-streams spring-kafka confluent-platform


    【解决方案1】:

    错误消息表示指定的 Serdes 与实际数据类型不匹配。您确实为键和值指定了StringSerde / StringSerde(似乎它们来自StreamsConfig),但实际的数据类型是KeyValue / AMKafkaMessage

    鉴于您的程序,这是有道理的。

    .mapValues(v -> getAMModelFromMessage(v))
    

    这会将值转换为AMKafkaMessage。当您执行groupBy 时,您会触发重新分区,因此您需要使用Grouped 参数为该值设置相应的serde(否则,Kafka Streams 会退回到似乎设置为@987654328 的配置中的serde @)。

    .groupBy(..., Grouped.valueSerde(/* set Serded for AMKafkaMessage type */)
    

    此外,你做

    .groupBy((k, v) -> KeyValue.pair(v != null ? v.getProjectId() : null, v))
    

    这将为密钥返回一个KeyValue 对。请注意,groupBy 不会更改输入记录值,但指定的 KeyValueMapper 应该只返回新键。即,对于输入&lt;k1,v1&gt;,代码的结果键值对将是&lt;&lt;p1,v&gt;, v&gt;(假设p1 是从值中提取的)。

    我假设,您实际上想要这样做:

    .groupBy((k, v) -> v != null ? v.getProjectId() : null)
    

    将projectId设置为新key,得到&lt;p1,v&gt;作为结果?

    【讨论】:

    • 感谢您的建议,我设法将对象列表转换为 String 并最终使用 String serde 进行通信。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-07-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-02-27
    • 2013-11-20
    相关资源
    最近更新 更多