【问题标题】:Azure Function not picking up Eventhub eventsAzure 函数未接收 Eventhub 事件
【发布时间】:2021-07-17 17:18:34
【问题描述】:

我开始使用 Azure Functions,但遇到的问题是我的 Function 没有被进入我的 eventthub 的事件触发。

这是我的函数的代码:

host.json:

  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[2.*, 3.0.0)"
  }
}

function.json:

  "scriptFile": "__init__.py",
  "bindings": [
    {
      "type": "eventHubTrigger",
      "name": "events",
      "direction": "in",
      "eventHubName": "eventhub",
      "connection": "eventhub_connection",
      "cardinality": "many",
      "consumerGroup": "$Default",
      "dataType": "stream"
    }
  ]
}

init.py:

import logging

import azure.functions as func


def main(events: List[func.EventHubEvent]):
    for event in events:
        logging.info('Python EventHub trigger processed an event: %s',
                        event.get_body().decode('utf-8'))
        logging.info(f'Function triggered to process a message: {event.get_body().decode()}')
        logging.info(f'  EnqueuedTimeUtc = {event.enqueued_time}')
        logging.info(f'  SequenceNumber = {event.sequence_number}')
        logging.info(f'  Offset = {event.offset}')

# def main(event: func.EventHubEvent):
#     logging.info(f'Function triggered to process a message: {event.get_body().decode()}')
#     logging.info(f'  EnqueuedTimeUtc = {event.enqueued_time}')
#     logging.info(f'  SequenceNumber = {event.sequence_number}')
#     logging.info(f'  Offset = {event.offset}')

#     # Metadata
#     for key in event.metadata:
#         logging.info(f'Metadata: {key} = {event.metadata[key]}')
  "IsEncrypted": false,
  "Values": {
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=storageaccount;AccountKey=storageacciuntaccesskey=;EndpointSuffix=core.windows.net",
    "eventhub_connection": "Endpoint=sb://eventhub01.servicebus.windows.net/;SharedAccessKeyName=function;SharedAccessKey=0omitted;EntityPath=eventhub"
  }
}

我从 Azure Function Core 工具提供的基本 eventthub python 代码开始。并且一直在测试人们的博客和 Microsoft 文档的在线示例中的不同代码段。

当切换到基数时:一 -> 我切换到当前被注释掉的代码。我不知道是不是应该这样,我觉得很对。

在任何情况下,无论基数设置如何,或者在二进制、流或字符串之间更改数据类型。我的函数根本不会触发。

我可以查询我的 eventthub 并查看/阅读事件。所以我知道我的政策,以及共享密钥等,工作正常。我也只使用 $Default 消费者组。

我还尝试设置一个 HTTP 触发函数,该函数是从 Azure Monitor 触发的。我可以在日志中看到每个进入函数的请求。

我在 eventthub 函数的代码中做错了吗? 我是否缺少其他一些配置设置?我已经检查了该功能的访问规则,但这真的没关系吗?该函数正在从 eventthub 中提取事件。它不是由发起者发送数据。

编辑:添加了local.settings.json文件配置并更新了function.json 编辑 2:我的具体问题的解决方案在答案的 cmets 中。

【问题讨论】:

    标签: python-3.x azure-functions azure-eventhub


    【解决方案1】:

    更新:

    __init__.py 函数:

    from typing import List
    import logging
    
    import azure.functions as func
    
    
    def main(events: List[func.EventHubEvent]):
        for event in events:
            logging.info('Python EventHub trigger processed an event: %s',
                            event.get_body().decode('utf-8'))
    

    向事件中心发送消息:

    import asyncio
    from azure.eventhub.aio import EventHubProducerClient
    from azure.eventhub import EventData
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a connection string to your event hubs namespace and
        # the event hub name.
        producer = EventHubProducerClient.from_connection_string(conn_str="Endpoint=sb://testbowman.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxx;EntityPath=test", eventhub_name="test")
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData('First event '))
            event_data_batch.add(EventData('Second event'))
            event_data_batch.add(EventData('Third event'))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    

    请确保您提供正确的事件中心名称:


    看来你的function.json有问题,连接字符串不应该直接放在绑定项中。

    应该是这样的:

    function.json

    {
      "scriptFile": "__init__.py",
      "bindings": [
        {
          "type": "eventHubTrigger",
          "name": "events",
          "direction": "in",
          "eventHubName": "test",
          "connection": "testbowman_RootManageSharedAccessKey_EVENTHUB",
          "cardinality": "many",
          "consumerGroup": "$Default",
          "dataType": "binary"
        }
      ]
    }
    

    local.settings.json

    {
      "IsEncrypted": false,
      "Values": {
        "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=0730bowmanwindow;AccountKey=xxxxxx;EndpointSuffix=core.windows.net",
        "FUNCTIONS_WORKER_RUNTIME": "python",
        "testbowman_RootManageSharedAccessKey_EVENTHUB": "Endpoint=sb://testbowman.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxx;EntityPath=test"
      }
    }
    

    【讨论】:

    • 您好鲍曼,感谢您的回复。我将对此进行测试。我在哪里可以找到这些信息?我在 MS 文档中找不到它。另外我怎么知道 DefaultEndpointsProtocol、AccountName、AccountKey 和 EndpointSuffix 的值应该是什么?我假设这会在为该函数创建 HTTP 触发器、EventGrid 函数或其他类型的触发器时发生变化。
    • 直接搜索此配置设置将我带到这里:docs.microsoft.com/en-us/azure/azure-functions/… --> 它没有向我解释为什么设置不正确或为什么 eventthub 触发器需要它。但我明白价值观需要是什么。
    • @Marco 大多数触发器需要提供 AzureWebJobsStorage。可以提供this,也可以提供UseDevelopmentStorage=true(这是用sql数据库模拟的)。
    • @Marco 嗨,我的意思是事件中心的连接字符串。
    • 好的,多亏了你激发了我大脑的连接,我才弄明白了。我不知道 local.settings.json 包含 function.json 使用的变量。因此,我在上传更新的 local.settings.json 后从未检查过应用程序设置,并且在检查时看起来新变量从未添加到应用程序设置中。手动添加后,eventthub 触发器自动开始工作。
    猜你喜欢
    • 2020-04-26
    • 2016-05-28
    • 2017-03-28
    • 1970-01-01
    • 1970-01-01
    • 2022-11-22
    • 1970-01-01
    • 2014-11-07
    • 2020-09-02
    相关资源
    最近更新 更多