【问题标题】:How to serialize/deserialize from ksql avro format to c# using confluent platform如何使用融合平台将 ksql avro 格式序列化/反序列化为 c#
【发布时间】:2021-07-29 08:43:23
【问题描述】:

我正在使用KsqlDb 表格,表格如下:

KSQL-DB 查询
create table currency (id integer,name varchar) with (kafka_topic='currency',partitions=1,value_format='avro');

C# 模型

public class Currency
{
    public int Id{get;set;}
    public string Name{get;set;}
}

现在我想知道我应该如何使用 Confluent 库在 C# 中从这个主题写入/读取数据:

写作

 IProducer<int, Currency> producer=....

 Currency cur=new Currency();

 Message<int,Currency> message = new Message<int, Currency>
            {
                Key = msg.Id,
                Timestamp = new Timestamp(DateTime.UtcNow, TimestampType.CreateTime),
                Value = msg
            };
 DeliveryResult<int,Currency> delivery =  await this.producer.ProduceAsync(topic,message);

阅读

IConsumer<int,Currency> iconsumer = new ConsumerBuilder<int, Currency>(config)
                .SetKeyDeserializer(Deserializers.Int32) //i assume i need to use the id from my dto
                .SetValueDeserializer(...) //what deserializer
                .Build();

ConsumeResult<int,Currency> result = consumer.Consume();

Currency message =  // what deserializer JsonSerializer.Deserialize<Currency>(result.Message.Value);

我不知道该怎么做,所以我尝试寻找序列化程序。我找到了这个库AvroSerializer,但我不知道作者从哪里获取schema

关于如何读取/写入与我的ksqldb 模型匹配的特定主题的任何帮助?

更新

经过一些研究和一些答案,我已经开始使用schemaRegistry

var config = new ConsumerConfig
            {
                GroupId = kafkaConfig.ConsumerGroup,
                BootstrapServers = kafkaConfig.ServerUrl,
                AutoOffsetReset = AutoOffsetReset.Earliest
            };
var schemaRegistryConfig = new SchemaRegistryConfig
            {
                Url = kafkaConfig.SchemaRegistryUrl
            };
var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);

IConsumer<int,Currency> consumer = new ConsumerBuilder<int, Currency>(config)
            .SetKeyDeserializer(new AvroDeserializer<int>(schemaRegistry).AsSyncOverAsync())
            .SetValueDeserializer(new AvroDeserializer<Currency>(schemaRegistry).AsSyncOverAsync())
            .Build();

ConsumeResult<int, Currency> result = consumer.Consume();

现在我收到另一个错误:

期望长度为 5 字节或更长但总数据大小的数据帧 是 4 个字节

正如有人好心指出的那样,我似乎只从模式注册表中检索 id。

我怎样才能:insert into currency (id,name) values (1,3) 并在 C# 中将其作为 POCO 检索(如上所列)?

更新 2

在我找到这个source 程序后,似乎由于某种原因我无法将消息发布到表中。

发送消息没有错误但没有发布到Kafka。

【问题讨论】:

    标签: c# serialization avro ksqldb confluent-kafka-dotnet


    【解决方案1】:

    我找到了这个库 AvroSerializer ,但我不知道作者从哪里获取架构。

    不清楚为什么您需要使用 Confluent 以外的库,但他们知道 from the Schema Registry。您可以使用CachedSchemaRegistryClient 轻松获取架构字符串,但您不需要在代码中使用它,因为反序列化程序会自行从注册表下载。

    如果您参考examples/ in the confluent-kafka-dotnet repo for Specific Avro consumption,您可以看到他们从User.avsc 文件生成User 类,这似乎正是您想要在这里为Currency 做的事情,而不是自己编写它

    【讨论】:

    • 你指的是哪个库the Confluent one。我还没有看到关于如何序列化/反序列化 POCO 的示例。我只是想要一种简单的方法来将我的POCO c# 模型序列化为 avro 格式,将其插入 kafka,使用ksql 对其进行转换,然后从输出主题中使用并从 AVRO 反序列化为另一个 C# 模型
    • This library from Confluent(链接到示例) - 没有定义 POCO。它生成它。该库已导入到您问题中链接的库中,因此无论如何您都会安装它。 KSQL 中的 Avro 也需要 not “plain Avro” 的模式注册表
    • 我已经过了那部分,现在当我使用消息时,它说最小字节长度是 5,我从 kafka 只收到 4,然后它抛出,我不明白为什么.
    • 就像我说的,数据不是“普通的 Avro”——至少为 Schema Registry 数据保留了 5 个字节。 docs.confluent.io/platform/current/schema-registry/…
    • 你能不能说得更清楚些。我不明白我必须做什么才能获取整个消息。我已经更新了我的原始帖子。
    【解决方案2】:

    我通过定义我的自定义序列化程序解决了这个问题,从而实现了ISerializer&lt;T&gt;IDeserializer&lt;T&gt; 接口,它们的腹部只是System.Text.Json.JsonSerializerNewtonsoftJson 的包装器。

    序列化器

    public class MySerializer:ISerializer<T>
    {
         byte[] Serialize(T data, SerializationContext context)
         {
              var str=System.Text.Json.JsonSerializer.Serialize(data); //you can also use Newtonsoft here
              var bytes=Encoding.UTF8.GetBytes(str);
              return bytes;
         }
    }
    

    用法

       var config = new ConsumerConfig
                    {
                        GroupId = kafkaConfig.ConsumerGroup,
                        BootstrapServers = kafkaConfig.ServerUrl,
                        AutoOffsetReset = AutoOffsetReset.Earliest
                    };
        IConsumer<int,Currency> consumer = new ConsumerBuilder<int, Currency>(config)
                .SetValueDeserializer(new MySerializer<Currency>())
                .Build();
    
    ConsumeResult<int, Currency> result = consumer.Consume();
    

    附言

    在我实现了接口之后,我什至没有在这里使用模式注册表

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-06-20
      • 2017-02-12
      • 2018-05-27
      • 1970-01-01
      • 1970-01-01
      • 2020-03-04
      • 1970-01-01
      相关资源
      最近更新 更多