【问题标题】:Update KTable based on partial data attributes根据部分数据属性更新KTable
【发布时间】:2019-11-20 18:08:08
【问题描述】:

我正在尝试使用对象的部分数据更新 KTable。 例如。用户对象是 {"id":1, "name":"Joe", "age":28} 该对象被流式传输到一个主题中,并按键分组到 KTable 中。 现在用户对象部分更新如下{"id":1, "age":33} 并流式传输到表中。但更新后的表格如下{"id":1, "name":null, "age":28}。 预期输出为{"id":1, "name":"Joe", "age":33}。 如何使用 Kafka 流和 Spring Cloud 流来实现预期的输出。任何建议,将不胜感激。谢谢。

这里是代码

 @Bean
        public Function<KStream<String, User>, KStream<String, User>> process() {
            return input -> input.map((key, user) -> new KeyValue<String, User>(user.getId(), user))
                    .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(User.class))).reduce((user1, user2) -> {
                        user1.merge(user2);
                        return user1;
                    }, Materialized.as("allusers")).toStream();
        }

并使用以下代码修改了用户对象:

    public void merge(Object newObject) {
        assert this.getClass().getName().equals(newObject.getClass().getName());
        for (Field field : this.getClass().getDeclaredFields()) {
            for (Field newField : newObject.getClass().getDeclaredFields()) {
                if (field.getName().equals(newField.getName())) {
                    try {
                        field.set(this, newField.get(newObject) == null ? field.get(this) : newField.get(newObject));
                    } catch (IllegalAccessException ignore) {
                    }
                }
            }
        }
    }

这是正确的方法还是 KStreams 中的任何其他方法?

【问题讨论】:

  • 你的方法是正确的。正如@xcrazy360 所提到的,您应该创建一个返回的新对象,但不要改变传入的当前聚合和新值。

标签: apache-kafka-streams spring-kafka spring-cloud-stream


【解决方案1】:

我已经测试了您的合并代码,它似乎按预期工作。但是由于reduce之后的结果是{"id":1, "name":null, "age":28},我可以想到两件事:

  • 您的状态根本没有更新,因为没有任何属性发生变化。
  • 可能你有序列化问题,因为字符串属性为空,但其他 int 属性没问题。

我的猜测是,因为您正在改变原始对象并返回相同的值,所以 kafka 流不会将其检测为更改并且不会存储新状态。实际上,您不应该改变您的对象,因为它可能导致不确定性,具体取决于您的管道。

尝试更改您的merge 函数以创建一个新的User 对象,并查看行为是否发生变化。

【讨论】:

  • 感谢@xcrazy360 的回复。没有 '''merge''' 更新后的数据是 '''{"id":1, "name":null, "age":28}''' 。使用 '''merge''',表格正在更新。我想知道这是否是 KStreams 和 Cloud Streams 世界中的正确方法,或者还有其他更有效的方法。我是流媒体开发的新手,因此提出了这个问题。
  • 你的方法是正确的。正如@xcrazy360 所提到的,您应该创建一个返回的新对象,但不要改变传入的当前聚合和新值。
【解决方案2】:

所以这里是合并 2 个对象的推荐通用方法,请在此处发表评论。为此,被合并的对象应该有一个空的构造函数。

     public <T> T mergeObjects(T first, T second) {
        Class<?> clazz = first.getClass();
        Field[] fields = clazz.getDeclaredFields();
        Object newObject = null;
        try {
            newObject = clazz.getDeclaredConstructor().newInstance();
            for (Field field : fields) {
                field.setAccessible(true);
                Object value1 = field.get(first);
                Object value2 = field.get(second);
                Object value = (value2 == null) ? value1 : value2;
                field.set(newObject, value);
            }
        } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
                | InvocationTargetException | NoSuchMethodException | SecurityException e) {

            e.printStackTrace();
        }
        return (T) newObject;
    }

【讨论】:

    猜你喜欢
    • 2015-12-07
    • 1970-01-01
    • 2018-02-21
    • 1970-01-01
    • 1970-01-01
    • 2023-04-08
    • 1970-01-01
    • 2023-03-21
    • 1970-01-01
    相关资源
    最近更新 更多