【问题标题】:How to set the EventProcessorHost to read events from now on (UTC)?如何设置 EventProcessorHost 从现在开始读取事件(UTC)?
【发布时间】:2016-02-26 03:33:30
【问题描述】:

我们正在使用 EventProcessorHost 从 Azure EventHubs 接收事件。我一直未能成功地尝试将它配置(通过 EventProcessorOptions.InitialOffsetProvider)从 UTC 现在读取事件,但它总是从提要的开头读取。我没有保存检查点(我什至删除了创建的 BLOB 容器)。 我是这样设置的:

DateTime startDate = DateTime.UtcNow;

var epo = new EventProcessorOptions
            {
                MaxBatchSize = 100, 
                PrefetchCount = 100, 
                ReceiveTimeOut = TimeSpan.FromSeconds(120),  
                InitialOffsetProvider = (name) => startDate  
            };

任何指导将不胜感激。

【问题讨论】:

  • 我没有过多地使用 EventHubs,但我已经在 GitHub 上运行了 Azure Connect the Dots 演示,并且在他们的演示中,它们没有像你一样声明变量。他们有 UtcNow 内联。例如: InitialOffsetProvider = (name) => DateTime.UtcNow github.com/Azure/connectthedots/blob/master/Azure/WebSite/…

标签: azure azure-eventhub


【解决方案1】:

认为这在 2.0.0 版本中发生了变化 - Rajiv 的代码现在是:

var eventProcessorOptions = new EventProcessorOptions
{
    InitialOffsetProvider = (partitionId) => EventPosition.FromEnqueuedTime(DateTime.UtcNow)
};

这是一个具有完全限定类名的示例块:

    private static async Task MainAsync(string[] args)
    {
        try{
            Console.WriteLine("Registering EventProcessor...");

            string AISEhConnectionStringEndpoint = Configuration["AISEhConnectionStringEndpoint"];
            string AISEhConnectionStringSharedAccessKeyName = Configuration["AISEhConnectionStringSharedAccessKeyName"];
            string AISEhConnectionStringSharedAccessKey = Configuration["AISEhConnectionStringSharedAccessKey"];
            string EhConnectionString = $"Endpoint={AISEhConnectionStringEndpoint};SharedAccessKeyName={AISEhConnectionStringSharedAccessKeyName};SharedAccessKey={AISEhConnectionStringSharedAccessKey}";
            string AISEhEntityPath = Configuration["AISEhEntityPath"];
            string AISEhConsumerGroupName = Configuration["AISEhConsumerGroupName"];
            string AISStorageContainerName = Configuration["AISStorageContainerName"];
            string AISStorageAccountName = Configuration["AISStorageAccountName"];
            string AISStorageAccountKey = Configuration["AISStorageAccountKey"];

            string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", AISStorageAccountName, AISStorageAccountKey);

            var eventProcessorHost = new Microsoft.Azure.EventHubs.Processor.EventProcessorHost(
                AISEhEntityPath,
                AISEhConsumerGroupName,
                EhConnectionString,
                StorageConnectionString,
                AISStorageContainerName);

            var options = new Microsoft.Azure.EventHubs.Processor.EventProcessorOptions
            {
                InitialOffsetProvider = (partitionId) => Microsoft.Azure.EventHubs.EventPosition.FromEnqueuedTime(DateTime.UtcNow)
            };

            // Registers the Event Processor Host and starts receiving messages
            await eventProcessorHost.RegisterEventProcessorAsync<GetEvents>(options);

            Thread.Sleep(Timeout.Infinite);

            // Disposes of the Event Processor Host
            await eventProcessorHost.UnregisterEventProcessorAsync();
        }
        catch(Exception ex)
        {
            Console.WriteLine(ex.Message);
            NLog.LogManager.GetCurrentClassLogger().Error(ex);
            throw;
        }
    }

}

这是我的常规设置,其中隐藏了秘密/确切地址以帮助解决问题,因为我发现解决这个问题比拔牙更不愉快:

"AISEhConnectionStringEndpoint": "sb://<my bus address>.servicebus.windows.net/",
"AISEhConnectionStringSharedAccessKeyName": "<my key name>",
"AISEhConnectionStringSharedAccessKey": "<yeah nah>",
"AISEhEntityPath": "<Event Hub entity path>",
"AISEhConsumerGroupName":  "<consumer group name e.g $Default>",
"AISStorageContainerName":  "<storage container name>",
"AISStorageAccountName": "<storage account name>",
"AISStorageAccountKey": "<yeah nah>",

【讨论】:

  • 你是用什么来识别EventPosition的?
  • 需要添加 nuget Microsoft.Azure.EventHubs 并将其用作命名空间。不幸的是,这个更新的代码仍然无法与我们的事件中心监听器在上周左右接收数据一起工作
  • 是的 - 这是正确的使用 - 我会用一些额外的信息更新评论。我确实发现弄清楚让事件中心连接的神奇咒语非常令人沮丧。如果您需要更多信息或想由我运行无密码的字符串,请告诉我。
  • @leighghunt:初始化事件处理器主机时是否还需要设置一个新的LeaseContainerName? github.com/Azure/azure-event-hubs-dotnet/issues/…
  • @mikebridge,是的,我从记忆中这么认为(我们搬到了 kafka,所以我没有工作环境来测试了)。如果您已经为给定的租用容器名称消耗了一些事件,那么从那时起对代码中初始偏移量的任何更改都将无效,因为它不再被使用。
【解决方案2】:

您可以为此使用 EventProcessorOptions 类并提供一个设置为所需时间的偏移量。

var eventProcessorOptions = new EventProcessorOptions
{
    InitialOffsetProvider = (partitionId) => DateTime.UtcNow
};

然后您可以使用任何接受eventProcessorOptionsRegisterEventProcessAsync 重载。

【讨论】:

    【解决方案3】:

    我发现 blob 中的检查点文件夹仍然存在,我的应用正在考虑这一点并忽略了我在 EventProcessorOptions 中设置的日期。在我删除容器后,它开始按预期运行(计算 UTC 日期)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-11-12
      • 1970-01-01
      • 1970-01-01
      • 2010-12-04
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多