【发布时间】:2021-08-17 22:54:47
【问题描述】:
我对卡夫卡流世界真的很陌生,并且在这方面只有初学者水平的知识。 在我们当前的应用程序中,我们接收来自 Kafka 主题的 json 消息,格式如下:
消息1
{
"projectId": "Project1",
"userId": "User1",
"actions": [
{
"APP_MESSAGE_ID": "27",
"category": "FA"
}
]
}
消息2
{
"projectId": "Project1",
"userId": "User1",
"actions": [
{
"APP_MESSAGE_ID": "28",
"category": "FA"
}
]
}
消息3
{
"projectId": "Project2",
"userId": "User1",
"actions": [
{
"APP_MESSAGE_ID": "29",
"category": "PD"
}
]
}
现在我们想以这种格式将其推送到目标主题:
所需目标输出消息
[
{
"projectId": "Project1",
"data": [
{
"APP_MESSAGE_ID": "27",
"category": "FA"
},
{
"APP_MESSAGE_ID": "28",
"category": "FA"
}
]
},
{
"projectId": "Project2",
"data": [
{
"APP_MESSAGE_ID": "29",
"category": "PD"
}
]
}
]
我已经设法为我的输入主题写了如下
KStreamBuilder builder = new KStreamBuilder();
Pattern pattern = Pattern.compile("Input-Message.*");
KStream<String, Object> sourceStream = builder.stream(pattern);
KTable aggregatedTable =
sourceStream
.filter((k, v) -> v != null)
.mapValues(v -> getAMModelFromMessage(v))
.groupBy((k, v) -> KeyValue.pair(v != null ? v.getProjectId() : null, v))
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)))
.aggregate(ArrayList::new, (k, v, list) -> {
list.add(v);
return list;
}
, Materialized.as("NewStore").withValueSerde(new ArrayListSerde(Serdes.String()))
.withKeySerde(Serdes.String()));
KStream<Object, ArrayList<AMKafkaMessage>> outputStream = aggregatedTable.toStream();
outputStream.to("destination_topic");
根据这个问题,我已经实现(使用)ArrayListSerde、ArrayListSerializer 和 ArrayListDeserializer 类: Issue with ArrayList Serde in Kafka Streams API
AMKafkaMessage 是我的模型类如下
import lombok.Data;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
@Data
@JsonIgnoreProperties
public class AMKafkaMessage {
private String projectId;
private String userId;
}
并写在下面的方法中,通过从 json 解析它来获取它的实例
private AMKafkaMessage getAMModelFromMessage(Object message) {
try {
Gson gson = new Gson();
AMKafkaMessage amKafkaMessage = gson.fromJson(message,
AMKafkaMessage.class);
return amKafkaMessage;
} catch (Exception e) {
logger.error("Exception occurred while decrypting message: {}", e.toString());
}
return null;
}
并且想写到名为“destination_topic”的主题
"destination_topic"
任何帮助实现这一点将不胜感激。 另外,请让我知道我是否在做(或思考)关于如何使用流的错误方式。 提前致谢。
上面的代码给了我例外
org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic kafka-stream-NewStore-repartition. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: org.apache.kafka.streams.KeyValue / value type: com.admin.model.kafkaModel.AMKafkaMessage). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:152) ~[kafka-streams-2.6.2.jar:?]
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:129) ~[kafka-streams-2.6.2.jar:?]
【问题讨论】:
标签: kafka-consumer-api apache-kafka-streams spring-kafka confluent-platform