【问题标题】:Kafka connect is sending a malformed jsonKafka connect 正在发送格式错误的 json
【发布时间】:2020-04-25 04:12:35
【问题描述】:

我正在尝试使用带有 rabbitMQ 连接器的 kafka-connect 来执行概念验证。基本上,我有两个简单的 Spring Boot 应用程序;一个 RabbitMQ 生产者和一个 Kafka 消费者。消费者无法处理来自连接器的消息,因为它正在以某种方式转换我的 JSON 消息; RabbitMQ 发送{"transaction": "PAYMENT", "amount": "$125.0"} 和kafka-connect 打印X{"transaction": "PAYMENT", "amount": "$125.0"}。请注意开头的X。如果我添加一个字段,比如说"foo": "bar",那么那个字母就会变成t 或其他什么。

Dockerfile(连接器)

FROM confluentinc/cp-kafka-connect-base:5.3.2
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-rabbitmq:latest

请按如下方式生成镜像:docker build . -t rabbit-connector,以便在docker-compose文件中引用rabbit-connector

docker-compose.yml

version: '2'

networks:
  kafka-connect-network:
    driver: bridge

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.3.2
    networks: 
      - kafka-connect-network
    ports:
      - '31000:31000'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      KAFKA_JMX_HOSTNAME: "localhost"
      KAFKA_JMX_PORT: 31000

  kafka:
    image: confluentinc/cp-enterprise-kafka:5.3.2
    networks: 
      - kafka-connect-network
    ports:
      - '9092:9092'
      - '31001:31001'
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'false'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
      KAFKA_JMX_HOSTNAME: "localhost"
      KAFKA_JMX_PORT: 31001

  schema-registry:
    image: confluentinc/cp-schema-registry:5.3.2
    depends_on:
      - zookeeper
      - kafka
    networks: 
      - kafka-connect-network
    ports:
      - '8081:8081'
      - '31002:31002'
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
      SCHEMA_REGISTRY_JMX_HOSTNAME: "localhost"
      SCHEMA_REGISTRY_JMX_PORT: 31002

  rabbitmq:
    image: rabbitmq
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
      RABBITMQ_DEFAULT_VHOST: "/"
    networks: 
      - kafka-connect-network
    ports:
      - '15672:15672'
      - '5672:5672'

  kafka-connect:
    image: rabbit-connector
    networks: 
      - kafka-connect-network
    ports:
      - '8083:8083'
      - '31004:31004'
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "ERROR"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
      KAFKA_JMX_HOSTNAME: "localhost"
      KAFKA_JMX_PORT: 31004
    depends_on:
      - zookeeper
      - kafka
      - schema-registry
      - rabbitmq

  rest-proxy:
    image: confluentinc/cp-kafka-rest:5.3.2
    depends_on:
      - zookeeper
      - kafka
      - schema-registry
    networks: 
      - kafka-connect-network
    ports:
      - '8082:8082'
      - '31005:31005'
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'kafka:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      KAFKAREST_JMX_HOSTNAME: "localhost"
      KAFKAREST_JMX_PORT: 31005

schema.avsc

{
  "type": "record",
  "name": "CustomMessage",
  "namespace": "com.poc.model",
  "fields": [
    {
      "name": "transaction",
      "type": "string"
    },
    {
      "name": "amount",
      "type": "string"
    }
  ]
}

所以在这里我使用StringConverter 作为我的密钥(我不在乎老实说)和AvroConverter 作为价值。也许我遗漏了一些东西,或者我错误地配置了我的 kafka-connect worker。

我的连接器配置是(connector-config.json):

 {
   "name" : "rabbit_to_kafka_poc",
   "config" : {
    "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
    "tasks.max" : "1",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "kafka.topic" : "spectrum-message",
    "rabbitmq.queue" : "spectrum-queue",
    "rabbitmq.username": "guest",
    "rabbitmq.password": "guest",
    "rabbitmq.host": "rabbitmq",
    "rabbitmq.port": "5672",
    "rabbitmq.virtual.host": "/"
   } 
 }

要注册我的连接器,我会使用curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @connector-config.json

配置完所有内容后,我会运行以下命令来打印我的消息:

kafka-avro-console-consumer --bootstrap-server localhost:9092 \
                            --topic spectrum-message \
                            --from-beginning

JSON 以字母开头,所以我的问题是为什么会发生这种情况?我认为有些东西正在对我的消息进行编码,但我的 rabbitMQ 生产者正在发送一条纯 JSON 消息。我可以通过使用 RabbitMQ 消费者进行测试并将我的应用程序调试到发送消息的位置来确认。

【问题讨论】:

    标签: apache-kafka apache-kafka-connect


    【解决方案1】:

    您需要使用ByteArrayConverter。连接器从 RabbitMQ 提取的只是字节 - 它不会尝试将其强制转换为模式。即使您将其序列化为 Avro,架构也只是一个字节字段:

    $ curl -s -XGET localhost:8081/subjects/rabbit-test-avro-00-value/versions/1 | jq '.'
    {
      "subject": "rabbit-test-avro-00-value",
      "version": 1,
      "id": 1,
      "schema": "\"bytes\""
    }
    

    如果您想将其写入 Avro 中的主题(这是一个好主意)使用模式然后使用 Kafka Streams 或 ksqlDB 之类的东西来执行此操作,将流处理器应用于源Kafka Connect 使用 ByteArrayConverter 写入的主题。

    例如在 ksqlDB 中你会这样做:

    -- Inspect the topic - ksqlDB recognises the format as JSON
    
    ksql> PRINT 'rabbit-test-00' FROM BEGINNING;
    Format:JSON
    {"ROWTIME":1578477403591,"ROWKEY":"null","transaction":"PAYMENT","amount":"$125.0"}
    {"ROWTIME":1578477598555,"ROWKEY":"null","transaction":"PAYMENT","amount":"$125.0"}
    
    -- Declare the schema
    CREATE STREAM rabbit (transaction VARCHAR,
                          amount VARCHAR)
      WITH (KAFKA_TOPIC='rabbit-test-00',
            VALUE_FORMAT='JSON');
    
    -- Reserialise to Avro
    CREATE STREAM TRANSACTIONS WITH (VALUE_FORMAT='AVRO', 
                                     KAFKA_TOPIC='reserialised_data') AS
      SELECT *
        FROM rabbit
        EMIT CHANGES;
    

    有关更多详细信息,请参阅我写的this blog

    【讨论】:

    • 我昨天自己弄清楚了,但我不知道 ksqldb 模式。谢谢。是否可以从 Java 中的 POJO 生成该模式,或者需要手动操作?
    • 如果您想将数据从无模式(例如 JSON)映射到模式,则必须在 ksqlDB 中手动声明它
    【解决方案2】:

    你没有 JSON 消息,你有来自 Kafka 的 Avro 消息,基于 AvroConverter 的使用。

    该字母实际上不是字母,而是您的终端显示二进制数据前 5 个字节的 UTF8 表示。这通常发生在使用常规控制台使用者时,而不是 avro-console-consumer 本身会正确解析主题中的字节以获取 Avro 数据

    如果您想要整个 JSON,请改用 JSONConverter

    【讨论】:

    • 抱歉,打错了。我是,使用kafka-avro-console-consumer。已编辑
    • 你也顺利通过--property schema.registry.url了吗?
    • 是的,因为开头的X而抛出错误:``` 共处理了1条消息 [2020-01-07 20:51:34,685] ERROR Unknown error when running消费者:(kafka.tools.ConsoleConsumer$:76) org.apache.kafka.common.errors.SerializationException:反序列化 id 21 的 Avro 消息时出错原因:java.net.MalformedURLException:无协议:/schemas/ids/21 at java.net.URL.(URL.java:610) at java.net.URL.(URL.java:507) ``` 所以我做了:curl http://localhost:8081/schemas/ids/21 -> {"schema":"\"bytes\""}% ??
    • 显然我的架构是错误的。我将尝试再次发布模式
    • 我创建了我的架构 curl -XPOST -H "Content-Type:application/json" -d"{\"schema\":$SCHEMA}" http://lo calhost:8081/subjects/spectrum-message/versions 并运行了 curl http://localhost:8081/schemas/ids/1,我得到:{"schema":"{\"type\":\"record\",\"name\":\"SpectrumMessage\",\"namespace\":\"com.up grade.model\",\"fields\":[{\"name\":\"transaction\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"string\"}]}"} 但错误仍然存​​在:java.net.MalformedURLException: no protocol: /schemas/ids/2curl http://localhost:8081/schemas/ids/2 -> {"schema":"\"bytes\""}%
    猜你喜欢
    • 2021-11-25
    • 2021-12-14
    • 1970-01-01
    • 2021-04-06
    • 2021-12-03
    • 2017-04-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多