【问题标题】:Kafka File Sink Connector - output JSONKafka 文件接收器连接器 - 输出 JSON
【发布时间】:2019-11-03 08:58:45
【问题描述】:

有人对 File Sink Connector 输出格式有疑问吗?

不知道为什么会收到这种格式

{op=u, before={withTax=1, unit=pc(s), deleted=0, updated_at=null, price=420000.0, name=Test 123, created_at=null, id=54, useTaxIncludedFormula=0}, after={withTax=1, unit=pc(s), deleted=0, updated_at=null, price=333.0, name=Test 123, created_at=null, id=54, useTaxIncludedFormula=0}, source={ts_sec=1561038448, query=null, thread=29638, server_id=100596, version=0.9.5.Final, file=mysql-bin.000012, connector=mysql, pos=1044011265, name=crm-kafka-connector, gtid=null, row=0, snapshot=false, db=gxapp_customer_db, table=registrationItemsTmp}, ts_ms=1561063703793}```

但是当我使用 kafka-console-consumer.sh 查看它时,我得到了有效的 JSON

{"before":{"id":54,"name":"Test 123","price":420000.0,"unit":"pc(s)","withTax":1,"created_at":null,"updated_at":null,"useTaxIncludedFormula":0,"deleted":0},"after":{"id":54,"name":"Test 123","price":333.0,"unit":"pc(s)","withTax":1,"created_at":null,"updated_at":null,"useTaxIncludedFormula":0,"deleted":0},"source":{"version":"0.9.5.Final","connector":"mysql","name":"crm-kafka-connector","server_id":100596,"ts_sec":1561038448,"gtid":null,"file":"mysql-bin.000012","pos":1044011265,"row":0,"snapshot":false,"thread":29638,"db":"gxapp_customer_db","table":"registrationItemsTmp","query":null},"op":"u","ts_ms":1561063703793}```

我已将所有内容设置为使用 JsonConverter

【问题讨论】:

    标签: apache-kafka apache-kafka-connect


    【解决方案1】:

    FileStreamSinkConnector 将根据它接收到的值写入数据。 JSONConverter 只是告诉 Kafka Connect 如何反序列化它收到的消息。

    如果您有关于主题的 JSON 只想转储到文件,那么在您的接收器连接器中使用

    "value.converter":"org.apache.kafka.connect.storage.StringConverter"
    

    此链接可能会提供更多有用的背景信息:https://rmoff.net/2019/05/08/when-a-kafka-connect-converter-is-not-a-converter/

    【讨论】:

    • 谢谢罗宾!这解决了它!现在一切正常。
    猜你喜欢
    • 2019-06-17
    • 2021-09-18
    • 2020-01-11
    • 2021-01-08
    • 2020-01-12
    • 2020-03-23
    • 2021-05-17
    • 2020-09-17
    • 1970-01-01
    相关资源
    最近更新 更多