【发布时间】:2021-07-29 08:43:23
【问题描述】:
我正在使用KsqlDb 表格,表格如下:
KSQL-DB 查询create table currency (id integer,name varchar) with (kafka_topic='currency',partitions=1,value_format='avro');
C# 模型
public class Currency
{
public int Id{get;set;}
public string Name{get;set;}
}
现在我想知道我应该如何使用 Confluent 库在 C# 中从这个主题写入/读取数据:
写作
IProducer<int, Currency> producer=....
Currency cur=new Currency();
Message<int,Currency> message = new Message<int, Currency>
{
Key = msg.Id,
Timestamp = new Timestamp(DateTime.UtcNow, TimestampType.CreateTime),
Value = msg
};
DeliveryResult<int,Currency> delivery = await this.producer.ProduceAsync(topic,message);
阅读
IConsumer<int,Currency> iconsumer = new ConsumerBuilder<int, Currency>(config)
.SetKeyDeserializer(Deserializers.Int32) //i assume i need to use the id from my dto
.SetValueDeserializer(...) //what deserializer
.Build();
ConsumeResult<int,Currency> result = consumer.Consume();
Currency message = // what deserializer JsonSerializer.Deserialize<Currency>(result.Message.Value);
我不知道该怎么做,所以我尝试寻找序列化程序。我找到了这个库AvroSerializer,但我不知道作者从哪里获取schema。
关于如何读取/写入与我的ksqldb 模型匹配的特定主题的任何帮助?
更新
经过一些研究和一些答案,我已经开始使用schemaRegistry
var config = new ConsumerConfig
{
GroupId = kafkaConfig.ConsumerGroup,
BootstrapServers = kafkaConfig.ServerUrl,
AutoOffsetReset = AutoOffsetReset.Earliest
};
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = kafkaConfig.SchemaRegistryUrl
};
var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
IConsumer<int,Currency> consumer = new ConsumerBuilder<int, Currency>(config)
.SetKeyDeserializer(new AvroDeserializer<int>(schemaRegistry).AsSyncOverAsync())
.SetValueDeserializer(new AvroDeserializer<Currency>(schemaRegistry).AsSyncOverAsync())
.Build();
ConsumeResult<int, Currency> result = consumer.Consume();
现在我收到另一个错误:
期望长度为 5 字节或更长但总数据大小的数据帧 是 4 个字节
正如有人好心指出的那样,我似乎只从模式注册表中检索 id。
我怎样才能:insert into currency (id,name) values (1,3) 并在 C# 中将其作为 POCO 检索(如上所列)?
更新 2
在我找到这个source 程序后,似乎由于某种原因我无法将消息发布到表中。
发送消息没有错误但没有发布到Kafka。
【问题讨论】:
标签: c# serialization avro ksqldb confluent-kafka-dotnet