【发布时间】:2019-07-02 20:30:28
【问题描述】:
我正在尝试创建一个 Kafka 连接连接器以从 AVRO 主题接收到文件。
然后使用 kafka-connect 将该文件恢复到另一个主题。
接收器工作正常,我可以看到接收器文件在增长并读取数据。但是当我尝试恢复到新主题时,新主题仍然没有数据..
我没有收到任何错误,我已经重置了偏移量,我创建了一个新的 kafka-connect 并尝试恢复,我创建了一个全新的 Kafka 集群,并且始终相同,源连接器上没有错误,但主题是空。
这里是源连接器配置的输出:
{
"name": "restored-exchange-rate-log",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"value.converter.schema.registry.url": "http://kafka-schema:8881",
"file": "/tmp/exchange-rate-log.sink.txt",
"format.include.keys": "true",
"source.auto.offset.reset": "earliest",
"tasks.max": "1",
"value.converter.schemas.enable": "true",
"name": "restored-exchange-rate-log",
"topic": "restored-exchange-rate-log",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
},
"tasks": [
{
"connector": "restored-exchange-rate-log",
"task": 0
}
],
"type": "source"
}
这里是源连接器状态的输出:
{
"name": "restored-exchange-rate-log",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8883"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "kafka-connect:8883"
}
],
"type": "source"
}
这里是接收器连接器配置的输出:
{
"name": "bkp-exchange-rate-log",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"source.auto.offset.reset": "earliest",
"tasks.max": "1",
"topics": "exchange-rate-log",
"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy",
"value.converter.schema.registry.url": "http://kafka-schema:8881",
"file": "/tmp/exchange-rate-log.sink.txt",
"format.include.keys": "true",
"value.converter.schemas.enable": "true",
"name": "bkp-exchange-rate-log",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
},
"tasks": [
{
"connector": "bkp-exchange-rate-log",
"task": 0
}
],
"type": "sink"
}
这里是sink连接器状态的输出:
{
"name": "bkp-exchange-rate-log",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8883"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "kafka-connect:8883"
}
],
"type": "sink"
}
接收器文件正在工作,并且一直在增长,但主题 restore-exchange-rate-log 完全为空。
添加更多细节。
我现在尝试使用“Zalando”方式,但我们不使用 s3,我们使用的是 FileStream 连接器。
这里是水槽:
{
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"file": "/tmp/exchange-rate-log.bin",
"format.include.keys": "true",
"tasks.max": "1",
"topics": "exchange-rate-log",
"format": "binary",
"value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"name": "bkp-exchange-rate-log"
}
来源:
{
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/tmp/exchange-rate-log.bin",
"format.include.keys": "true",
"tasks.max": "1",
"format": "binary",
"topic": "bin-test-exchange-rate-log",
"value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"name": "restore-exchange-rate-log"
}
sink 连接器看起来很好,sink 生成了这个文件 /tmp/exchange-rate-log.bin 并且正在增加,但是 Source (Restore) 出现错误:
Caused by: org.apache.kafka.connect.errors.DataException: bin-test-exchange-rate-log error: Not a byte array! [B@761db301
at com.spredfast.kafka.connect.s3.AlreadyBytesConverter.fromConnectData(AlreadyBytesConverter.java:22)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:269)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 11 more
【问题讨论】:
-
出于兴趣,你在这里建什么?使用文件通常是一种反模式,特别是如果您已经有一个主题并想写另一个主题。 docs.confluent.io/current/connect/kafka-connect-replicator/… 可能值得一看
-
您需要使用字节数组转换器来转储和恢复。例如,这是一个使用 S3 执行此操作的示例。 jobs.zalando.com/tech/blog/backing-up-kafka-zookeeper/…
-
@RobinMoffatt 我正在尝试从我当前的主题构建备份,因此我使用 txt 文件能够在一段时间内进行版本控制和恢复。
-
@cricket_007 我已经使用了字节数组。接收器文件现在有这种内容:[B@2e2f84bd [B@6f34b60d [B@254544f8 但是当我尝试恢复时..同样的事情没有关于该主题的数据。也许 Zalando 方式不适用于 avro?
-
我用 Zalando 的一个罚款来恢复 Avro ......事实上,二进制数据并不关心是否有 Avro、整数、字符串等数据...... Kafka 只是存储字节,所以如果你想备份和恢复一个主题(不用担心序列化),那么它必须是二进制的,而不是人类可读的
标签: apache-kafka apache-kafka-connect