【问题标题】:How to fetch events from eventhub from a timer enabled azure function?如何从启用计时器的天蓝色函数中从 eventthub 中获取事件?
【发布时间】:2020-06-25 03:39:45
【问题描述】:

我正在使用 Azure 事件中心。我的要求是每天使用 azure 函数从 Azure 事件中心获取事件。基本上我的天蓝色功能将启用计时器。它应该能够从 azure 事件中心获取数据。有这个机制吗? 我知道,只要在事件中心收到事件,我们就可以触发 azure 函数。这是我不想要的,因为该函数将执行 n 次。我只想每天获取事件。

【问题讨论】:

    标签: azure azure-functions azure-eventhub


    【解决方案1】:

    您仍然可以创建计时器触发函数并在代码中创建消费者客户端来接收事件。请参阅下面的示例代码。如果您有任何问题,请告诉我。

    using System;
    using System.Collections.Generic;
    using System.Threading.Tasks;
    using Azure.Messaging.EventHubs.Consumer;
    using Microsoft.Azure.WebJobs;
    using Microsoft.Extensions.Logging;
    
    namespace FunctionApp7
    {
        public static class Function1
        {
            const string EventHubsConnectionString = "your connection string";
            const string EventHubName = "evethub name";
            const string ConsumerGroupName = "cgname";
    
            [FunctionName("Function1")]
            public static void Run([TimerTrigger("0 */5 * * * *")]TimerInfo myTimer, ILogger log)
            {
                log.LogInformation($"C# Timer trigger function executed at: {DateTime.Now}");
    
                // You better dissover partitions by eventhub client. I am just hardcoding them here for now.
                var partitions = new List<string> { "0", "1" };
                var receiveTasks = new List<Task>();
    
                foreach(var p in partitions)
                {
                    receiveTasks.Add(ReadEventsFromPartition(p));
                }
    
                // Wait until all reads complete.
                Task.WhenAll(receiveTasks);
            }
    
            public static async Task ReadEventsFromPartition(string partitionId)
            {
                await using (var consumer = new EventHubConsumerClient(ConsumerGroupName, EventHubsConnectionString, EventHubName))
                {
                    EventPosition startingPosition = EventPosition.FromOffset(CheckpointStore.ReadOffsetForPartition(partitionId));
    
                    long lastOffset = -1;
    
                    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition))
                    {
                        // Process received events here.
    
                        // Break if no events left.
                        if (receivedEvent.Data == null)
                        {
                            break;
                        }
    
                        lastOffset = receivedEvent.Data.Offset;
                    }
    
                    // Persist last event's offset so we can continue reading from this position next time function is triggered.
                    if (lastOffset != -1)
                    {
                        // Write offset into some durable store.
                        CheckpointStore.WriteOffsetForPartition(partitionId, lastOffset);
                    }
                }
            }
        }
    }
    

    【讨论】:

    • 你可以在这里找到 JS 的代码 sn-ps - docs.microsoft.com/en-us/azure/event-hubs/…
    • 是的,但这会从 blobstorage 中读取数据,并将其用作检查点存储。我需要使用计时器触发的 azure 函数定期从 eventthub 直接准备好。这可能吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-12-01
    • 1970-01-01
    • 1970-01-01
    • 2020-06-30
    • 1970-01-01
    • 1970-01-01
    • 2023-03-23
    相关资源
    最近更新 更多