【问题标题】:Deserializing Avro message反序列化 Avro 消息
【发布时间】:2020-02-01 21:45:24
【问题描述】:

我从here 部署了 Kafka。我还像这样添加到docker-compose.yml Postgres 容器中:

postgres:
    image: postgres
    hostname: kafka-postgres
    container_name: kafka-postgres
    depends_on:
      - ksql-server
      - broker
      - schema-registry
      - connect
    ports:
      - 5432:5432

创建了一个主题页面浏览量。

我还使用设置创建了 DatagenConnector 并运行它。

{
  "name": "datagen-pageviews",
  "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "kafka.topic": "pageviews",
  "max.interval": "100",
  "iterations": "999999999",
  "quickstart": "pageviews"
} 

据我所知,连接器为主题定义了一个模式:

{
  "type": "record",
  "name": "pageviews",
  "namespace": "ksql",
  "fields": [
    {
      "name": "viewtime",
      "type": "long"
    },
    {
      "name": "userid",
      "type": "string"
    },
    {
      "name": "pageid",
      "type": "string"
    }
  ],
  "connect.name": "ksql.pageviews"
} 

我的下一步是创建 JdbcSinkConnector,它将数据从 Kafka 主题传输到 Postgres 表。那行得通。连接器的设置:

{
  "name": "from-kafka-to-pg",
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "topics": [
    "pageviews"
  ],
  "connection.url": "jdbc:postgresql://kafka-postgres:5432/postgres",
  "connection.user": "postgres",
  "connection.password": "********",
  "auto.create": "true",
  "auto.evolve": "true"
}

然后我尝试自己向该主题发送消息。但失败并出现错误:

[2020-02-01 21:16:11,750] ERROR 在任务 to-pg-0 中遇到错误。 使用类执行阶段“VALUE_CONVERTER” 'io.confluent.connect.avro.AvroConverter',其中消耗的记录是 {topic='pageviews', partition=0, offset=23834, 时间戳=1580591160374,时间戳类型=创建时间}。 (org.apache.kafka.connect.runtime.errors.LogReporter) org.apache.kafka.connect.errors.DataException:反序列化失败 Avro 的主题页面浏览量数据:在 io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110) 在 org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) 在 org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487) 在 org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464) 在 org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320) 在 org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) 在 org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) 在 org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) 在 org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748) 引起: org.apache.kafka.common.errors.SerializationException: 错误 反序列化 id -1 的 Avro 消息原因: org.apache.kafka.common.errors.SerializationException:未知的魔法 字节!

所以发送方法很重要。我就是这样做的(Python,confluent-kafka-python):

producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.poll(0)
producer.produce(topic, json.dumps({
   'viewtime': 123,
   'userid': 'user_1',
   'pageid': 'page_1'
}).encode('utf8'), on_delivery=kafka_delivery_report)
producer.flush()

也许我应该提供带有消息的架构(AvroProducer)?

【问题讨论】:

  • 您使用的是哪个库? kafka-pythonconfluent-kafka-python?

标签: python apache-kafka avro apache-kafka-connect kafka-producer-api


【解决方案1】:

主题需要 Avro 类型的消息。

来自confluent-kafka-pythonAvroProducer 成功了:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer


value_schema_str = """
{
   "namespace": "ksql",
   "name": "value",
   "type": "record",
   "fields" : [
     {
       "name" : "viewtime",
       "type" : "long"
     }, 
     {
       "name" : "userid",
       "type" : "string"
     }, 
     {
       "name" : "pageid",
       "type" : "string"
     }
   ]
}
"""

key_schema_str = """
{
   "namespace": "ksql",
   "name": "key",
   "type": "record",
   "fields" : [
     {
       "name" : "pageid",
       "type" : "string"
     }
   ]
}
"""

value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {"name": "Value"}
key = {"name": "Key"}


def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


avroProducer = AvroProducer({
    'bootstrap.servers': 'mybroker,mybroker2',
    'on_delivery': delivery_report,
    'schema.registry.url': 'http://schema_registry_host:port'
    }, default_key_schema=key_schema, default_value_schema=value_schema)

avroProducer.produce(topic='my_topic', value=value, key=key)
avroProducer.flush()

【讨论】:

  • 那行得通。非常感谢。它会使消息大小翻倍吗?
  • @АльбертАлександров 不,它没有。为什么会这样?
  • 好吧,这是我的误解。但多亏了你,我比昨天更了解了。
  • 当然。所以,假设。每个生产者都定义了他发送到主题的数据模式。为同一主题生成数据的另一个生产者可以定义另一个(但仅兼容)模式。对?之前我认为我必须在控制中心定义主题模式并在代码中使用它。
  • 您的架构版本和兼容性应通过架构注册表进行管理。
【解决方案2】:

您的问题出现是因为您尝试使用 Avro 转换器非 Avro 的主题中读取数据。

有两种可能的解决方案:

1.切换 Kafka Connect 的接收器连接器以使用正确的转换器

例如,如果您将来自 Kafka 主题的 JSON 数据消费到 Kafka Connect 接收器中:

...
value.converter=org.apache.kafka.connect.json.JsonConverter. 
value.converter.schemas.enable=true/false
...

value.converter.schemas.enable 取决于消息是否包含架构..

2.将上游格式切换为 Avro

为了让DatagenConnector产生消息到消息值格式为Avro的Kafka,设置value.convertervalue.converter.schema.registry.url参数:

...
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
...

详情请参阅 kafka-connect-datagen docs


关于 Kafka Connect 转换器和序列化的 article 很棒。

【讨论】:

  • 感谢您的回答和链接。看来我不明白 Avro 主题是什么。这是否意味着我应该使用 confluent_kafka.avro.AvroProducer 而不是简单的 Producer?如果是这样,则会出现一些其他问题。我在控制中心描述了主题页面浏览的模式,它位于 api 端点0.0.0.0:8081/subjects 的列表中(名称:页面浏览值)。现在我应该复制它,以便在发送到 Kafka 主题时向模式提供消息?
猜你喜欢
  • 1970-01-01
  • 2019-07-12
  • 2020-08-29
  • 2020-11-22
  • 2019-04-11
  • 2021-09-06
  • 2021-03-31
  • 1970-01-01
  • 2017-06-29
相关资源
最近更新 更多