【问题标题】:Azure EventHub, unable to subscribe for ReceivingAzure EventHub,无法订阅接收
【发布时间】:2016-05-28 05:24:34
【问题描述】:

我尝试通过 Python 从 Azure EventHub 接收消息,但遗憾的是我无法订阅它。

我的脚本基于https://gist.github.com/tomconte/e2a4667185a9bf674f59 并且已经在python script which subscribes/listens to Azure Event Hub? 中提出了另一个类似的问题,不幸的是没有解决它。

到我的设置: Python 2.7.9 (Ubuntu 15.04)

通过 pip 安装 qpid-proton:

pip show python-qpid-proton
...
Version: 0.11.1
...

所以我正在尝试以下方法:

from proton import *
import urllib
key = urllib.quote(FOOBAR,"")
address = "amqps://name:" + key + "@nsname.servicebus.windows.net/eventhubname/ConsumerGroups/$Default/Partitions/0"
messenger = Messenger()
messenger.subscribe(address)

proton.MessengerException: Cannot subscribe to [ADDRESS]

name/key 应该没问题,因为它可以在另一个应用程序中使用。

有什么猜测吗?

【问题讨论】:

  • 嗨,似乎没有必要对密钥进行编码。请尝试使用 Azure 门户中的原始密钥。任何结果,请告诉我。
  • 感谢您的回复。尝试不编码密钥但也失败了。在我的密钥中有一个“/”导致连接失败:proton.MessengerException: [-2]: CONNECTION ERROR (name:KeyUntilSlash): getaddrinfo(name, KeyUntilSlash): Servname not supported for ai_socktype 其中 KeyuntilSlash 是密钥的第一部分,不包括“/”

标签: python azure amqp qpid azure-eventhub


【解决方案1】:

您的密钥看起来可能包含“/”,因此您可能需要转到 azure 门户,看看是否可以改用辅助密钥。您可能需要创建一个新的“共享访问策略”

key shared access policy

【讨论】:

    【解决方案2】:

    另一种选择是使用最新的 azure-eventhub Python SDK 从事件中心接收消息。

    azure-eventhub 在 pypi 上可用:https://pypi.org/project/azure-eventhub/

    您可以关注receive sample code接收消息:

    #!/usr/bin/env python
    
    # --------------------------------------------------------------------------------------------
    # Copyright (c) Microsoft Corporation. All rights reserved.
    # Licensed under the MIT License. See License.txt in the project root for license information.
    # --------------------------------------------------------------------------------------------
    
    """
    An example to show receiving events from an Event Hub.
    """
    import os
    from azure.eventhub import EventHubConsumerClient
    
    CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
    EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']
    
    
    def on_event(partition_context, event):
        # Put your code here.
        # If the operation is i/o intensive, multi-thread will have better performance.
        print("Received event from partition: {}.".format(partition_context.partition_id))
    
    
    def on_partition_initialize(partition_context):
        # Put your code here.
        print("Partition: {} has been initialized.".format(partition_context.partition_id))
    
    
    def on_partition_close(partition_context, reason):
        # Put your code here.
        print("Partition: {} has been closed, reason for closing: {}.".format(
            partition_context.partition_id,
            reason
        ))
    
    
    def on_error(partition_context, error):
        # Put your code here. partition_context can be None in the on_error callback.
        if partition_context:
            print("An exception: {} occurred during receiving from Partition: {}.".format(
                partition_context.partition_id,
                error
            ))
        else:
            print("An exception: {} occurred during the load balance process.".format(error))
    
    
    if __name__ == '__main__':
        consumer_client = EventHubConsumerClient.from_connection_string(
            conn_str=CONNECTION_STR,
            consumer_group='$Default',
            eventhub_name=EVENTHUB_NAME,
        )
    
        try:
            with consumer_client:
                consumer_client.receive(
                    on_event=on_event,
                    on_partition_initialize=on_partition_initialize,
                    on_partition_close=on_partition_close,
                    on_error=on_error,
                    starting_position="-1",  # "-1" is from the beginning of the partition.
                )
        except KeyboardInterrupt:
            print('Stopped receiving.')
    

    【讨论】:

      猜你喜欢
      • 2017-03-28
      • 2022-01-15
      • 2021-07-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-10-23
      • 1970-01-01
      相关资源
      最近更新 更多