【问题标题】:How to receive latest message from Eventhub in c#如何在 c# 中接收来自 Eventhub 的最新消息
【发布时间】:2019-11-14 10:44:43
【问题描述】:
我想接收来自 eventthub 的最新消息,因为之前有太多消息发送到 eventthub。
我设置了自己的消费组,但似乎只能从早到现在接收消息。
有什么方法可以让消息跳到最新的吗?
【问题讨论】:
标签:
c#
azure
azure-eventhub
【解决方案1】:
如果你知道 azure eventthubs 中的checkpoint,这应该很容易实现。
简而言之,checkpoint是一个存储在azure blob storage中的文件,每次从eventthub读取数据时,都会在checkpoint中记录偏移量。并且下次使用同一个 checkpoint 从 eventthubs 读取数据时,它会从 offset 开始读取。
所以如果你想跳过读取旧数据,你可以先创建一个像this这样的测试接收器项目并设置检查点。然后下一次,在你的生产中,你可以使用相同的检查点,因为偏移量,它总是会跳过旧数据。
另一种方法是,您可以使用EventProcessorOptions 和RegisterEventProcessorAsync 方法。当您选择使用此方法时,您需要手动从 azure blob 存储中移除检查点,否则此设置将被检查点覆盖。
下面的示例,在您的receiver method:
private static async Task MainAsync(string[] args)
{
Console.WriteLine("Registering EventProcessor...");
var eventProcessorHost = new EventProcessorHost(
EventHubName,
PartitionReceiver.DefaultConsumerGroupName,
EventHubConnectionString,
StorageConnectionString,
StorageContainerName);
//here, you can get only the data sent in event hub in the recent an hour.
var options = new EventProcessorOptions
{
InitialOffsetProvider = (partitionId) => EventPosition.FromEnqueuedTime(DateTime.UtcNow.AddHours(-1))
};
// Registers the Event Processor Host and starts receiving messages
await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(options);
Console.WriteLine("Receiving. Press ENTER to stop worker.");
Console.ReadLine();
// Disposes of the Event Processor Host
await eventProcessorHost.UnregisterEventProcessorAsync();
}