【问题标题】:Kafka, Confluent client, Offset卡夫卡,汇合客户端,偏移量
【发布时间】:2019-07-27 01:56:55
【问题描述】:

我有一些关于在 Confluent Kafka 客户端 API for .NET 中使用偏移量的问题。不幸的是,答案并不明显,文档对我没有帮助。 如何检测偏移量不存在(根据保留规则可能已被删除)? 如何检测到已删除消息的偏移点(根据保留规则)? 如何找到第一条现有消息并将偏移量设置为它? 提前致谢。

【问题讨论】:

    标签: .net apache-kafka offset kafka-consumer-api confluent-platform


    【解决方案1】:

    你只需要设置auto.offset.reset:

    当 Kafka 中没有初始偏移量或当前 服务器上不再存在偏移量(例如因为该数据 已删除):

    最早:自动将偏移量重置为 最早的偏移量

    最新:自动将偏移量重置为最新 偏移量

    none: 如果没有先前的偏移量,则向消费者抛出异常 为消费者组找到

    其他任何东西: 向 消费者。

    例如:

    using System;
    using System.Threading;
    using Confluent.Kafka;
    
    class Program
    {
        public static void Main(string[] args)
        {
            var conf = new ConsumerConfig
            { 
                GroupId = "test-consumer-group",
                BootstrapServers = "localhost:9092",
                // Set auto.offset.reset to `earliest`
                AutoOffsetReset = AutoOffsetReset.Earliest
            };
    
            using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
            {
                c.Subscribe("my-topic");
    
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => {
                    e.Cancel = true; // prevent the process from terminating.
                    cts.Cancel();
                };
    
                try
                {
                    while (true)
                    {
                        try
                        {
                            var cr = c.Consume(cts.Token);
                            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"Error occured: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    // Ensure the consumer leaves the group cleanly and final offsets are committed.
                    c.Close();
                }
            }
        }
    }
    

    【讨论】:

    • 我觉得这回答了最后一个问题,但不是如何“检测”该组将被重置
    猜你喜欢
    • 2018-04-04
    • 2020-06-04
    • 1970-01-01
    • 2019-05-18
    • 1970-01-01
    • 2016-10-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多