【问题标题】:How to produce a Tombstone Avro Record in Kafka using dot net?如何使用点网在 Kafka 中生成 Tombstone Avro 记录?
【发布时间】:2020-08-20 03:25:36
【问题描述】:

我的sink.properties

{
  "name": "jdbc-oracle",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "orders",
    "connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
    "connection.user": "ersin",
    "connection.password": "ersin!",
    "auto.create": "true",
    "delete.enabled": "true",
    "pk.mode": "record_key",
    "pk.fields": "id",
    "insert.mode": "upsert",
    "plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
    "name": "jdbc-oracle"
  },
  "tasks": [
    {
      "connector": "jdbc-oracle",
      "task": 0
    }
  ],
  "type": "sink"
}

我的connect-avro-distributed.properties

bootstrap.servers=10.0.0.0:9092

group.id=connect-cluster

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.0.0:8081

config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

代码

var messageToSend = new Message <GenericRecord,GenericRecord>
{
    Key=recordKey 
    //,Value=recordValue
};

当我想发送具有空值的数据时,它会给出错误 (null reference)。

我该如何解决这个错误?

提前致谢

【问题讨论】:

    标签: .net apache-kafka avro kafka-producer-api confluent-schema-registry


    【解决方案1】:
    var tombstoner = new ProducerBuilder<int, Null>(_kafkaConfiguration.ProducerConfiguration)
        .SetKeySerializer(new AvroSerializer<int>(_schemaRegistryClient))
        .SetValueSerializer(Serializers.Null)
        .Build();
    
    var tasks = properties.Select(property => tombstoner.ProduceAsync(
        "yourTopicName",
        new Message<int, Null> {
            Key = 100,
            Value = null,
            Timestamp = Timestamp.Default
        }
    ));
    

    但是请注意,目前无法使用confluent-kafka-dotnet 客户端来使用墓碑记录。

    【讨论】:

      猜你喜欢
      • 2020-07-26
      • 2018-10-14
      • 2022-08-04
      • 2018-06-20
      • 2015-05-01
      • 1970-01-01
      • 2022-11-24
      • 2019-09-10
      • 1970-01-01
      相关资源
      最近更新 更多