【问题标题】:Kafka-connect FileStreamSourceConnector doesn't produce to a topicKafka-connect FileStreamSourceConnector 不产生主题
【发布时间】:2019-07-02 20:30:28
【问题描述】:

我正在尝试创建一个 Kafka 连接连接器以从 AVRO 主题接收到文件。

然后使用 kafka-connect 将该文件恢复到另一个主题。

接收器工作正常,我可以看到接收器文件在增长并读取数据。但是当我尝试恢复到新主题时,新主题仍然没有数据..

我没有收到任何错误,我已经重置了偏移量,我创建了一个新的 kafka-connect 并尝试恢复,我创建了一个全新的 Kafka 集群,并且始终相同,源连接器上没有错误,但主题是空。

这里是源连接器配置的输出:

{
  "name": "restored-exchange-rate-log",
  "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "value.converter.schema.registry.url": "http://kafka-schema:8881",
    "file": "/tmp/exchange-rate-log.sink.txt",
    "format.include.keys": "true",
    "source.auto.offset.reset": "earliest",
    "tasks.max": "1",
    "value.converter.schemas.enable": "true",
    "name": "restored-exchange-rate-log",
    "topic": "restored-exchange-rate-log",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  },
  "tasks": [
    {
      "connector": "restored-exchange-rate-log",
      "task": 0
    }
  ],
  "type": "source"
}

这里是源连接器状态的输出:

{
  "name": "restored-exchange-rate-log",
  "connector": {
    "state": "RUNNING",
    "worker_id": "kafka-connect:8883"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "kafka-connect:8883"
    }
  ],
  "type": "source"
}

这里是接收器连接器配置的输出:

{
    "name": "bkp-exchange-rate-log",
    "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "source.auto.offset.reset": "earliest",
    "tasks.max": "1",
    "topics": "exchange-rate-log",
    "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy",
    "value.converter.schema.registry.url": "http://kafka-schema:8881",
    "file": "/tmp/exchange-rate-log.sink.txt",
    "format.include.keys": "true",
    "value.converter.schemas.enable": "true",
    "name": "bkp-exchange-rate-log",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
    },
    "tasks": [
    {
        "connector": "bkp-exchange-rate-log",
        "task": 0
    }
    ],
    "type": "sink"
}

这里是sink连接器状态的输出:

{
    "name": "bkp-exchange-rate-log",
    "connector": {
    "state": "RUNNING",
    "worker_id": "kafka-connect:8883"
    },
    "tasks": [
    {
        "state": "RUNNING",
        "id": 0,
        "worker_id": "kafka-connect:8883"
    }
    ],
    "type": "sink"
}

接收器文件正在工作,并且一直在增长,但主题 restore-exchange-rate-log 完全为空。


添加更多细节。

我现在尝试使用“Zalando”方式,但我们不使用 s3,我们使用的是 FileStream 连接器。

这里是水槽:

{
  "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
  "file": "/tmp/exchange-rate-log.bin",
  "format.include.keys": "true",
  "tasks.max": "1",
  "topics": "exchange-rate-log",
  "format": "binary",
  "value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
  "key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
  "name": "bkp-exchange-rate-log"
}

来源:

{
  "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
  "file": "/tmp/exchange-rate-log.bin",
  "format.include.keys": "true",
  "tasks.max": "1",
  "format": "binary",
  "topic": "bin-test-exchange-rate-log",
  "value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
  "key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
  "name": "restore-exchange-rate-log"
}

sink 连接器看起来很好,sink 生成了这个文件 /tmp/exchange-rate-log.bin 并且正在增加,但是 Source (Restore) 出现错误:

Caused by: org.apache.kafka.connect.errors.DataException: bin-test-exchange-rate-log error: Not a byte array! [B@761db301
    at com.spredfast.kafka.connect.s3.AlreadyBytesConverter.fromConnectData(AlreadyBytesConverter.java:22)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:269)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 11 more

【问题讨论】:

  • 出于兴趣,你在这里建什么?使用文件通常是一种反模式,特别是如果您已经有一个主题并想写另一个主题。 docs.confluent.io/current/connect/kafka-connect-replicator/… 可能值得一看
  • 您需要使用字节数组转换器来转储和恢复。例如,这是一个使用 S3 执行此操作的示例。 jobs.zalando.com/tech/blog/backing-up-kafka-zookeeper/…
  • @RobinMoffatt 我正在尝试从我当前的主题构建备份,因此我使用 txt 文件能够在一段时间内进行版本控制和恢复。
  • @cricket_007 我已经使用了字节数组。接收器文件现在有这种内容:[B@2e2f84bd [B@6f34b60d [B@254544f8 但是当我尝试恢复时..同样的事情没有关于该主题的数据。也许 Zalando 方式不适用于 avro?
  • 我用 Zalando 的一个罚款来恢复 Avro ......事实上,二进制数据并不关心是否有 Avro、整数、字符串等数据...... Kafka 只是存储字节,所以如果你想备份和恢复一个主题(不用担心序列化),那么它必须是二进制的,而不是人类可读的

标签: apache-kafka apache-kafka-connect


【解决方案1】:

我能够使用 kafka-avro-console-consumer 生成主题的“转储”。我们正在使用 SSL + Schema Registry。

这里是能够生成主题转储的命令行:

tpc=exchange-rate-log
SCHEMA_REGISTRY_OPTS="-Djavax.net.ssl.keyStore=. -Djavax.net.ssl.trustStore=. -Djavax.net.ssl.keyStorePassword=. -Djavax.net.ssl.trustStorePassword=." \
kafka-avro-console-consumer \
  --from-beginning --bootstrap-server $CONNECT_BOOTSTRAP_SERVERS \
  --property schema.registry.url=$CONNECT_SCHEMA_REGISTRY_URL \
  --topic $tpc --consumer-property security.protocol=SSL \
  --consumer-property ssl.truststore.location=/etc/ssl/kafkaproducer.truststore.jks \
  --consumer-property ssl.truststore.password=$MYPASS \
  --consumer-property ssl.keystore.location=/etc/ssl/kafkaproducer.keystore.jks \
  --consumer-property ssl.keystore.password=$MYPASS \
  --consumer-property ssl.key.password=$MYPASS \
  --property "key.separator=::-::" \
  --property "schema.id.separator=::_::" \
  --property print.schema.ids=true \
  --timeout-ms 15000 \
  --property "print.key=true" \
  --key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" > $tpc.dump

但我没有找到如何使用 kafka-avro-console-producer 将其导入回来的方法,因为它不适用于非 avro 密钥。 有了这个转储文件,我可以编写一个 python 生产者来读取该文件并恢复主题。

【讨论】:

    【解决方案2】:

    我不完全确定 Connect File 连接器是一个很好的用例。

    另外,Avro 转换器不会以可重现的格式转储文件。它看起来像Struct{field=value}

    如果您真的想转储到文件,只需执行kafka-avro-console-consumer,包含密钥,将--key-deserializer 作为字符串传递,然后使用> file.txt 将其写出

    要恢复,可以尝试使用 Avro 控制台生产者,但是没有字符串序列化器属性,所以需要引用键,我相信是传递给 JSON 解析器

    你可以这样测试

    echo '"1"|{"data":value"}'  > kafka-avro-console-producer...
    

    (也需要设置key.separator属性)

    做一个文件看起来像

    kafka-avro-console-producer...  < file.txt 
    

    如果你的整个 Kafka 集群消失了,而你只剩下这个文件,那么你还需要备份你的 Avro 模式(因为注册表 _schemas 主题没了)

    【讨论】:

    • 看起来无法将 kafka-avro-console-producer 与非 avro 密钥一起使用。如果我使用 --property "parse.key=true" 执行 kafka-avro-console-producer,它将需要 key.schema。我能够从模式注册表生成“转储”,包括密钥、avro 消息和模式 id。但是要生产(将文件导入回来),没有 key.schema 是行不通的
    • 对。就个人而言,我已经重写了控制台生产者来实现这一点。 Kafka Connect 也可以工作,但不能使用其中一个
    猜你喜欢
    • 1970-01-01
    • 2020-05-28
    • 2017-02-05
    • 2019-04-30
    • 2021-06-22
    • 2017-01-08
    • 2020-04-17
    • 2021-12-16
    • 2020-06-25
    相关资源
    最近更新 更多