【发布时间】:2019-12-02 17:52:32
【问题描述】:
在 eventthub 中,我尝试使用 asyncio 模块同时接收数据,这已在 Here 中讨论。
我在这里要解决的问题是,当我在 for 循环下定义一个变量时,当使用 loop.stop() 停止循环时它就消失了
代码与上面链接的几乎相同。
定义的类如下:
global list_
list_ = []
class EventProcessor(AbstractEventProcessor):
def __init__(self, params=None):
super().__init__(params)
self._msg_counter = 0
async def open_async(self, context):
print("Connection established {}".format(context.partition_id))
async def close_async(self, context, reason):
print("Connection closed (reason {}, id {}, offset {}, sq_number {})".format(
reason,
context.partition_id,
context.offset,
context.sequence_number))
async def process_events_async(self, context, messages):
for event_data in messages:
last_offset = event_data.offset.value
last_sn = event_data.sequence_number
data = event_data.body_as_str(encoding= 'UTF-8')
list_.append(data)
print("Received data: {}, Num:{}".format(last_sn, len(list_))
if len(list_) == 10:
self.loop.close() ## <- it does not stop at len(list_) == 10
#self.loop.stop() ## <- it does stop but the "list_" is dissapeared.
async def process_error_async(self, context, error):
print("Event Processor Error {!r}".format(error))
正如我在上面评论的那样,self.loop.close() 和 self.loop.stop() 的使用并没有以我想要的方式工作。
对于下一个代码,它只是循环并运行直到完成任务
loop = asyncio.get_event_loop()
# Storage Account Credentials
STORAGE_ACCOUNT_NAME = "xxx"
STORAGE_KEY = "xxxx"
LEASE_CONTAINER_NAME = "xxx"
NAMESPACE = "xxx"
EVENTHUB = "xxx"
USER = "RootManageSharedAccessKey"
KEY = "xxxx"
# Eventhub config and storage manager
eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$default")
eh_options = EPHOptions()
eh_options.release_pump_on_timeout = True
eh_options.debug_trace = False
storage_manager = AzureStorageCheckpointLeaseManager(STORAGE_ACCOUNT_NAME, STORAGE_KEY, LEASE_CONTAINER_NAME)
# Event loop and host
host = EventProcessorHost(
EventProcessor,
eh_config,
storage_manager,
ep_params=["param1","param2"],
eph_options=eh_options,
loop=loop)
tasks = asyncio.gather(
host.open_async(),
wait_and_close(host))
loop.run_until_complete(tasks)
我想从中导出的最后一个变量是“list_”
【问题讨论】:
标签: azure streaming python-asyncio azure-eventhub