【问题标题】:Kafka Connect API error when set default value of STRUCT type设置 STRUCT 类型的默认值时 Kafka Connect API 错误
【发布时间】:2020-04-25 14:32:03
【问题描述】:

我想设置一个默认值STRUCT,代码如下:

        SchemaBuilder schemaBuilder = SchemaBuilder.struct().name("homeAddress")
                .field("province", SchemaBuilder.STRING_SCHEMA)
                .field("city", SchemaBuilder.STRING_SCHEMA);
        Struct defaultValue = new Struct(schemaBuilder.build())
                .put("province", "aaaa")
                .put("city", "bbbb");
        Schema dataSchema = SchemaBuilder.struct().name("personMessage")
                .field("address", schemaBuilder.defaultValue(defaultValue).build()).build();
        Struct struct = new Struct(dataSchema);

但我得到如下错误

Exception in thread "main" org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
    at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:251)
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
    at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
    ... 1 more

我挖掘了 ConnectSchema.validateValue 的代码,发现为什么会抛出异常,

值的模式类型是ConnectSchema,但另一个是SchemaBuilder,然后抛出异常。

case STRUCT:
    Struct struct = (Struct) value;
    if (!struct.schema().equals(schema))
        throw new DataException("Struct schemas do not match.");
    struct.validate();

等号方法

        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        ConnectSchema schema = (ConnectSchema) o;
        return Objects.equals(optional, schema.optional) &&
                Objects.equals(version, schema.version) &&
                Objects.equals(name, schema.name) &&
                Objects.equals(doc, schema.doc) &&
                Objects.equals(type, schema.type) &&
                Objects.deepEquals(defaultValue, schema.defaultValue) &&
                Objects.equals(fields, schema.fields) &&
                Objects.equals(keySchema, schema.keySchema) &&
                Objects.equals(valueSchema, schema.valueSchema) &&
                Objects.equals(parameters, schema.parameters);

谁能帮忙设置STRUCT类型的默认值

下面是方法'defaultValue'的代码:

public SchemaBuilder defaultValue(Object value) {
        checkCanSet(DEFAULT_FIELD, defaultValue, value);
        checkNotNull(TYPE_FIELD, type, DEFAULT_FIELD);
        try {
            ConnectSchema.validateValue(this, value);
        } catch (DataException e) {
            throw new SchemaBuilderException("Invalid default value", e);
        }
        defaultValue = value;
        return this;
    }

如果我将 ConnectSchema.validateValue(this, value) 更改为 ConnectSchema.validateValue(this.builder(), value) 似乎可以,我不知道其他情况是否可以。

谢谢。

【问题讨论】:

    标签: java apache-kafka apache-kafka-connect


    【解决方案1】:

    改变这个 Struct defaultValue = new Struct(schemaBuilder.build())
    Struct defaultValue = new Struct(schemaBuilder)

    ConnectSchemaSchemaBuilder 两个类都在实现 Schema schemaBuilder.schemaBuilder.build() 将所有值传递给 ConnectSchema 构造函数并返回新的 Schema 对象,并且该对象是不可变的。

    看看这个。

    System.out.println(struct); //empty 
    System.out.println(struct.get("address"));  //default values
    System.out.println(  ((Struct)struct.get("address")).getString("city")   ); //default values
    System.out.println(  ((Struct)struct.get("address")).getString("province")   ); //default values
    

    默认值不会重写为结构字段。它们只存在于模式定义中,如果字段为空,则从模式定义中返回默认值。

    来自 Struct.class

      public Object get(Field field) {
            Object val = this.values[field.index()];
            if (val == null && field.schema().defaultValue() != null) {
                val = field.schema().defaultValue();
            }
    
            return val;
        }
    

    【讨论】:

    • 感谢您的回复。我正在听从你的建议,但 struct 的对象是空的。我在最后一行添加 system.out.printlin(struct.toString()),然后输出是“Struct{}”
    • 我认为默认值只能在创建模式时设置,不知道是否正确。
    • 我很确定你会问这个空的 Struct{} 。答案中有更多解释。
    • 了解,测试通过,万分感谢您的帮助。
    • 嗨 @Arkadiusz Łukasiewicz,当我使用该解决方案并使用 JsonConvertor 发送记录时,我收到另一个错误。我打开一个新的讨论。stackoverflow.com/questions/59729574/…,谢谢你的帮助。
    猜你喜欢
    • 2018-10-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-24
    • 2017-02-06
    • 2020-01-02
    • 2014-08-18
    相关资源
    最近更新 更多