【发布时间】:2019-11-20 18:08:08
【问题描述】:
我正在尝试使用对象的部分数据更新 KTable。
例如。用户对象是
{"id":1, "name":"Joe", "age":28}
该对象被流式传输到一个主题中,并按键分组到 KTable 中。
现在用户对象部分更新如下{"id":1, "age":33} 并流式传输到表中。但更新后的表格如下{"id":1, "name":null, "age":28}。
预期输出为{"id":1, "name":"Joe", "age":33}。
如何使用 Kafka 流和 Spring Cloud 流来实现预期的输出。任何建议,将不胜感激。谢谢。
这里是代码
@Bean
public Function<KStream<String, User>, KStream<String, User>> process() {
return input -> input.map((key, user) -> new KeyValue<String, User>(user.getId(), user))
.groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(User.class))).reduce((user1, user2) -> {
user1.merge(user2);
return user1;
}, Materialized.as("allusers")).toStream();
}
并使用以下代码修改了用户对象:
public void merge(Object newObject) {
assert this.getClass().getName().equals(newObject.getClass().getName());
for (Field field : this.getClass().getDeclaredFields()) {
for (Field newField : newObject.getClass().getDeclaredFields()) {
if (field.getName().equals(newField.getName())) {
try {
field.set(this, newField.get(newObject) == null ? field.get(this) : newField.get(newObject));
} catch (IllegalAccessException ignore) {
}
}
}
}
}
这是正确的方法还是 KStreams 中的任何其他方法?
【问题讨论】:
-
你的方法是正确的。正如@xcrazy360 所提到的,您应该创建一个返回的新对象,但不要改变传入的当前聚合和新值。
标签: apache-kafka-streams spring-kafka spring-cloud-stream