【问题标题】:How to handle multiple records in Kafka Consumer?如何处理 Kafka Consumer 中的多条记录?
【发布时间】:2019-07-16 06:07:42
【问题描述】:

您好,我正在开发 Confluent Kafka Consumer。我的经纪人有多个记录。我现在想处理所有的记录。下面是我的消费者实现。

public ConsumeResult<string, GenericRecord> Consume(string topic)
    {
      ConsumeResult<string, GenericRecord> result;
      try
      {
        result = consumer.Consume();
        Commit(result);
        return result;
      }
      catch (Exception e)
      {
        this.logger.Error("KafkaClient", $"Error sending message '{e.Message}'");
        return null;
      }
    }

如果 Broker 中有多个记录,那么我将使用 GenericRecord 获得一个事件/消息。如果有多个记录那么如何有效地处理消费者呢?任何帮助,将不胜感激。谢谢

【问题讨论】:

    标签: c# .net apache-kafka confluent-platform


    【解决方案1】:

    你只会循环。看例子

    https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/AvroGeneric/Program.cs

    while (true)
    {
        try
        {
            var consumeResult = consumer.Consume(cts.Token);
    
            Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");
        }
        catch (ConsumeException e)
        {
            Console.WriteLine($"Consume error: {e.Error.Reason}");
        }
    }
    

    【讨论】:

    • 感谢您的回答。如果有多条记录,那么如果我放while循环,那么它会获取多条记录对吗?
    • 它会继续消耗,是的。是否获得更多记录,取决于 Producer
    • 所以当执行 var consumeResult = consumer.Consume(cts.Token);它只会消耗一条记录吗? nd 在下一个循环中再次消耗第二条记录?我说的对吗?
    • 谢谢。这是我的消费者配置。 BootstrapServers = kafkaConfig.BootstrapServers, GroupId = "ProductEvent" 我需要添加任何其他设置吗?我想读取上次读取的数据。
    • 我只是在添加while循环后调试了我的代码。我的经纪人有多个记录。但只返回了一条记录。我可以看到偏移量 5(我添加数据的 5 次)。我添加了屏幕截图。
    猜你喜欢
    • 2019-10-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-10-31
    • 2016-06-29
    • 1970-01-01
    • 2022-01-12
    相关资源
    最近更新 更多