【发布时间】:2020-04-29 06:52:09
【问题描述】:
我有一个生产者类使用来自Github 的自定义 JsonSerializer 发送到一个主题
public class JsonSerializer<T> implements Serializer<T> {
...
@Override
public byte[] serialize(String topic, T data) {
try {
return this.objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException(e);
}
}
...
}
我正在使用这些配置运行 Datastax Kafka 连接器:
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
连接器尝试使用主题时出现这些错误:
[2020-01-12 13:57:53,324] WARN Error inserting/updating row for Kafka record SinkRecord{kafkaOffset=416, timestampType=CreateTime} ConnectRecord{topic='test-3', kafkaPartition=17, key=null, keySchema=Schema{STRING}, value={}, valueSchema=null, timestamp=1578811437723, headers=ConnectHeaders(headers=)}: Primary key column(s) mmsi, ts cannot be left unmapped. Check that your mapping setting matches your dataset contents. (com.datastax.kafkaconnector.DseSinkTask:286)
从那个错误中,我认为连接器无法检索 Json 数据。我做错了什么?
更新
我尝试了 Kafka JsonSerializer。
我试过 StringSerializer,因为连接器说它也受支持。
我发现确实有一些数据写入数据库,但与 kafka 主题发送的总数据相比,它总是相对较小的数字。大约 5 到 10 个数据。
我试图让连接器保持运行,我发现它写入失败后,它就不会再写入了。
【问题讨论】:
-
连接器配置中的映射是什么,数据库架构是什么?
-
根据错误,你没有写入数据库的价值
value={}, valueSchema=null -
@cricket_007 经过一番研究,我尝试使用 kafka jsonserializer。另一个错误发生了。拿到电脑后,我会在这里更新。
标签: java apache-kafka cassandra datastax apache-kafka-connect