【问题标题】:Kafka consumer commit thread-safetyKafka消费者提交线程安全
【发布时间】:2018-09-30 14:24:13
【问题描述】:

我正在使用Confluent.Kafka dotnet 客户端。

namespace Confluent.Kafka
{
    public class Consumer<TKey, TValue> : IDisposable
    {
         public Task<CommittedOffsets> CommitAsync();
    }
}

如您所见,Consumer.CommitAsync 是一个异步方法。在不等待其响应的情况下调用CommitAsync 方法然后再调用Subscribe 是否安全?

下面的示例代码。

using (var consumer = new Confluent.Kafka.Consumer<MessageKey, byte[]>(config, new MessageKeyDeserializer(), new ByteArrayDeserializer()))
{
                consumer.Subscribe(topics);

                while (true)
                {
                    Message<MessageKey, byte[]> msg;
                    if (consumer.Consume(out msg, TimeSpan.FromSeconds(1)))
                    {
                        // ...

                        if( msg.Offset % 100 == 0)
                        {
                            consumer.CommitAsync().ContinueWith((t) =>
                           {
                               // log t.Exception
                           }, TaskContinuationOptions.OnlyOnFaulted);
                        }
                    }
                }
}

【问题讨论】:

    标签: c# apache-kafka kafka-consumer-api confluent-platform


    【解决方案1】:

    我假设你想说下一次调用 Consume

    是的,这是安全的,这没问题。 我还会为提交添加一些时间窗口(例如,在 5s 和 100msgs 之间首先出现),这样如果您有一段时间没有收到消息,您仍然可以提交它们

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-01-31
      • 2017-03-19
      • 1970-01-01
      • 1970-01-01
      • 2021-06-14
      • 2017-08-22
      相关资源
      最近更新 更多