【问题标题】:How to produce a Tombstone Avro Record in Kafka using Python?如何使用 Python 在 Kafka 中生成 Tombstone Avro 记录?
【发布时间】:2020-07-26 11:17:13
【问题描述】:

我的接收器属性:

{
  "name": "jdbc-oracle",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "orders",
    "connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
    "connection.user": "ersin",
    "connection.password": "ersin!",
    "auto.create": "true",
    "delete.enabled": "true",
    "pk.mode": "record_key",
    "pk.fields": "id",
    "insert.mode": "upsert",
    "plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
    "name": "jdbc-oracle"
  },
  "tasks": [
    {
      "connector": "jdbc-oracle",
      "task": 0
    }
  ],
  "type": "sink"
}

我的 connect-avro-distributed.properties

bootstrap.servers=10.0.0.0:9092

group.id=connect-cluster

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.0.0:8081

config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

我这样发送数据:

from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=['10.0.0.0:9092'],
)
message=producer.send('orders', key=b'{"id":1}', value=None)

但它给出了错误。序列化错误。

【问题讨论】:

标签: python apache-kafka kafka-producer-api kafka-python


【解决方案1】:

我假设您想要生成 Avro 消息,因此您需要正确序列化您的消息。我将使用confluent-kafka-python 库,所以如果您还没有安装它,只需运行

pip install confluent-kafka[avro]

下面是一个示例 AvroConsumer,它发送一个空值的 Avro 消息:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer


value_schema_str = """
{
   "type":"record",
   "name":"myrecord",
   "fields":[
      {
         "name":"id",
         "type":[
            "null",
            "int"
         ],
         "default":null
      },
      {
         "name":"product",
         "type":[
            "null",
            "string"
         ],
         "default":null
      },
      {
         "name":"quantity",
         "type":[
            "null",
            "int"
         ],
         "default":null
      },
      {
         "name":"price",
         "type":[
            "null",
            "int"
         ],
         "default":null
      }
   ]
}
"""

key_schema_str = """
{
   "type":"record",
   "name":"key_schema",
   "fields":[
      {
         "name":"id",
         "type":"int"
      }
   ]
}
"""


def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


if __name__ == '__main__':
    value_schema = avro.loads(value_schema_str)
    key_schema = avro.loads(key_schema_str)
    #value = {"id": 1, "product": "myProduct", "quantity": 10, "price": 100}
    key = {"id": 1}


    avroProducer = AvroProducer({
        'bootstrap.servers': '10.0.0.0:9092',
        'on_delivery': delivery_report,
        'schema.registry.url': 'http://10.0.0.0:8081'
    }, default_key_schema=key_schema, default_value_schema=value_schema)

    avroProducer.produce(topic='orders', key=key)
    avroProducer.flush()

【讨论】:

【解决方案2】:

您需要在 Avro Schema 中进行设置,以便能够将 Avro 字段设置为 null,方法是将 null 添加为可能的字段类型之一。

查看 Avro 文档中的示例:

{
  "type": "record",
  "name": "yourRecord",
  "fields" : [
    {"name": "empId", "type": "long"},              // mandatory field
    {"name": "empName", "type": ["null", "string"]} // optional field 
  ]
}

这里empName 在类型中声明为空值或字符串。这允许将 empName 字段设置为 null。

【讨论】:

  • Schema 在这里也可能是一个问题,但实际上他使用了错误的客户端,这就是出现序列化错误的原因。
  • @GiorgosMyrianthous 如何在没有架构的情况下使用您的代码?它适用于架构,但我想在没有架构的情况下使用它
猜你喜欢
  • 2020-08-20
  • 2018-10-14
  • 2022-08-04
  • 2018-06-20
  • 2015-05-01
  • 1970-01-01
  • 2022-11-24
  • 2019-09-10
  • 1970-01-01
相关资源
最近更新 更多