【问题标题】:Receiving messages using Asyncio (Asynchronously) and get the variable as final result使用 Asyncio(异步)接收消息并获取变量作为最终结果
【发布时间】:2019-12-02 17:52:32
【问题描述】:

在 eventthub 中,我尝试使用 asyncio 模块同时接收数据,这已在 Here 中讨论。

我在这里要解决的问题是,当我在 for 循环下定义一个变量时,当使用 loop.stop() 停止循环时它就消失了

代码与上面链接的几乎相同。

定义的类如下:

global list_
list_ = []

class EventProcessor(AbstractEventProcessor):

    def __init__(self, params=None):       
        super().__init__(params)
        self._msg_counter = 0

    async def open_async(self, context):        
        print("Connection established {}".format(context.partition_id))

    async def close_async(self, context, reason):

        print("Connection closed (reason {}, id {}, offset {}, sq_number {})".format(
            reason,
            context.partition_id,
            context.offset,
            context.sequence_number))

    async def process_events_async(self, context, messages):

         for event_data in messages:
            last_offset = event_data.offset.value
            last_sn = event_data.sequence_number
            data = event_data.body_as_str(encoding= 'UTF-8')
            list_.append(data)
            print("Received data: {}, Num:{}".format(last_sn, len(list_))

            if len(list_) == 10:
               self.loop.close()   ## <- it does not stop at len(list_) == 10
              #self.loop.stop()    ## <- it does stop but the "list_" is dissapeared.  


    async def process_error_async(self, context, error):

        print("Event Processor Error {!r}".format(error))

正如我在上面评论的那样,self.loop.close() 和 self.loop.stop() 的使用并没有以我想要的方式工作。

对于下一个代码,它只是循环并运行直到完成任务

loop = asyncio.get_event_loop()

# Storage Account Credentials
STORAGE_ACCOUNT_NAME = "xxx"
STORAGE_KEY = "xxxx"
LEASE_CONTAINER_NAME = "xxx"
NAMESPACE = "xxx"
EVENTHUB = "xxx"
USER = "RootManageSharedAccessKey"
KEY = "xxxx"

# Eventhub config and storage manager 
eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$default")
eh_options = EPHOptions()
eh_options.release_pump_on_timeout = True
eh_options.debug_trace = False
storage_manager = AzureStorageCheckpointLeaseManager(STORAGE_ACCOUNT_NAME, STORAGE_KEY, LEASE_CONTAINER_NAME)

# Event loop and host
host = EventProcessorHost(
    EventProcessor,
    eh_config,
    storage_manager,
    ep_params=["param1","param2"],
    eph_options=eh_options,
    loop=loop)



tasks = asyncio.gather(
    host.open_async(),
    wait_and_close(host))
loop.run_until_complete(tasks)

我想从中导出的最后一个变量是“list_”

【问题讨论】:

    标签: azure streaming python-asyncio azure-eventhub


    【解决方案1】:

    到目前为止,我找不到在满足条件时停止它的解决方案,但作为一种解决方法,我们可以控制可以将多少事件数据添加到 list_ 中。

    这是一个例子:

    import logging
    import asyncio
    import os
    import sys
    import signal
    import functools
    
    from azure.eventprocessorhost import (
        AbstractEventProcessor,
        AzureStorageCheckpointLeaseManager,
        EventHubConfig,
        EventProcessorHost,
        EPHOptions
    )
    
    global list_
    list_ = []
    
    class EventProcessor(AbstractEventProcessor):
        """
            Example Implmentation of AbstractEventProcessor
        """
        def __init__(self, params=None):
            """
            Init Event processor
            """
    
            super().__init__(params)
            self._msg_counter = 0
    
        async def open_async(self, context):
            """
            Called by processor host to initialize the event processor.
            """
            print("Connection established {}".format(context.partition_id))
    
        async def close_async(self, context, reason):
            """
            Called by processor host to indicate that the event processor is being stopped.
            :param context: Information about the partition
            :type context: ~azure.eventprocessorhost.PartitionContext
            """
            print("Connection closed (reason {}, id {}, offset {}, sq_number {})".format(
                reason,
                context.partition_id,
                context.offset,
                context.sequence_number))
    
        async def process_events_async(self, context, messages):
            """
            Called by the processor host when a batch of events has arrived.
            This is where the real work of the event processor is done.
            :param context: Information about the partition
            :type context: ~azure.eventprocessorhost.PartitionContext
            :param messages: The events to be processed.
            :type messages: list[~azure.eventhub.common.EventData]
            """
    
            for m in messages:
                data = m.body_as_str()          
                print("Received data: {}".format(data))
                if len(list_) < 10:
                    list_.append(data)
    
    
        async def process_error_async(self, context, error):
            """
            Called when the underlying client experiences an error while receiving.
            EventProcessorHost will take care of recovering from the error and
            continuing to pump messages,so no action is required from
            :param context: Information about the partition
            :type context: ~azure.eventprocessorhost.PartitionContext
            :param error: The error that occured.
            """
            print("Event Processor Error {!r}".format(error))
    
    async def wait_and_close(host):
        """
        Run EventProcessorHost for 2 minutes then shutdown.
        """
        await asyncio.sleep(10)
        await host.close_async()
    
    
    loop = asyncio.get_event_loop()
    
    # Storage Account Credentials
    STORAGE_ACCOUNT_NAME = "xxx"
    STORAGE_KEY = "xxx"
    LEASE_CONTAINER_NAME = "xxx"
    NAMESPACE = "xxxx"
    EVENTHUB = "xxx"
    USER = "RootManageSharedAccessKey"
    KEY = "xxx"
    
    # Eventhub config and storage manager 
    eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$default")
    eh_options = EPHOptions()
    eh_options.release_pump_on_timeout = True
    eh_options.debug_trace = False
    storage_manager = AzureStorageCheckpointLeaseManager(
    STORAGE_ACCOUNT_NAME, STORAGE_KEY, LEASE_CONTAINER_NAME)
    
    # Event loop and host
    host = EventProcessorHost(
        EventProcessor,
        eh_config,
        storage_manager,
        ep_params=["param1","param2"],
        eph_options=eh_options,
        loop=loop)
    
    tasks = asyncio.gather(
        host.open_async(),
        wait_and_close(host))
    loop.run_until_complete(tasks)   
    
    print("*the length of list_*")
    print(len(list_))
    print("***the value from list_ variable***")
    print(list_)
    

    结果:

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-02-10
      • 2012-07-24
      • 1970-01-01
      • 1970-01-01
      • 2016-06-14
      • 2016-04-27
      • 2019-08-02
      • 2021-11-22
      相关资源
      最近更新 更多