【发布时间】:2020-04-25 14:32:03
【问题描述】:
我想设置一个默认值STRUCT,代码如下:
SchemaBuilder schemaBuilder = SchemaBuilder.struct().name("homeAddress")
.field("province", SchemaBuilder.STRING_SCHEMA)
.field("city", SchemaBuilder.STRING_SCHEMA);
Struct defaultValue = new Struct(schemaBuilder.build())
.put("province", "aaaa")
.put("city", "bbbb");
Schema dataSchema = SchemaBuilder.struct().name("personMessage")
.field("address", schemaBuilder.defaultValue(defaultValue).build()).build();
Struct struct = new Struct(dataSchema);
但我得到如下错误
Exception in thread "main" org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:251)
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
... 1 more
我挖掘了 ConnectSchema.validateValue 的代码,发现为什么会抛出异常,
值的模式类型是ConnectSchema,但另一个是SchemaBuilder,然后抛出异常。
case STRUCT:
Struct struct = (Struct) value;
if (!struct.schema().equals(schema))
throw new DataException("Struct schemas do not match.");
struct.validate();
等号方法
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ConnectSchema schema = (ConnectSchema) o;
return Objects.equals(optional, schema.optional) &&
Objects.equals(version, schema.version) &&
Objects.equals(name, schema.name) &&
Objects.equals(doc, schema.doc) &&
Objects.equals(type, schema.type) &&
Objects.deepEquals(defaultValue, schema.defaultValue) &&
Objects.equals(fields, schema.fields) &&
Objects.equals(keySchema, schema.keySchema) &&
Objects.equals(valueSchema, schema.valueSchema) &&
Objects.equals(parameters, schema.parameters);
谁能帮忙设置STRUCT类型的默认值
下面是方法'defaultValue'的代码:
public SchemaBuilder defaultValue(Object value) {
checkCanSet(DEFAULT_FIELD, defaultValue, value);
checkNotNull(TYPE_FIELD, type, DEFAULT_FIELD);
try {
ConnectSchema.validateValue(this, value);
} catch (DataException e) {
throw new SchemaBuilderException("Invalid default value", e);
}
defaultValue = value;
return this;
}
如果我将 ConnectSchema.validateValue(this, value) 更改为 ConnectSchema.validateValue(this.builder(), value) 似乎可以,我不知道其他情况是否可以。
谢谢。
【问题讨论】:
标签: java apache-kafka apache-kafka-connect