【问题标题】:Add a coroutine after some coroutines have finished, in an already running event loop (same loop, same thread)在一些协程完成后添加一个协程,在一个已经运行的事件循环中(同一个循环,同一个线程)
【发布时间】:2022-01-11 04:16:52
【问题描述】:

我已经阅读了一些关于 SO 的类似问题,为了向已经运行的事件循环添加协程,一些答案是针对所提出的问题量身定制的,因此不适用于我的情况;最常见的是asyncio.ensure_future(coro(), loop=my_running_loop),或者对于线程安全版本,它将是asyncio.run_coroutine_threadsafe(coro(), my_running_loop);最后的手段是在当前循环结束后生成另一个循环。

我将首先提出我的问题:

  1. 为什么前两种方法不起作用?
  2. 除了第三种方法之外,还有其他方法吗,最好在类中完成,这样我就不必在类固有的循环完成后启动另一个循环?

我的场景的一个简化示例是,有 1000 个项目位于 REST-API 的另一端,我必须将它们全部拉出来进行一些分析。但是 REST-API 取决于网络连接,因此我可能会间歇性地遇到一些项目只是超时错误(或为空);在这种情况下,我必须再次调用 REST-API 来获取那些失败的。

只是为了演示我的场景,没有所有粒度(数据特定而不是代码逻辑):

import asyncio
import ItemType  # Enum
import async_rest_api
import pandas as pd

class DataSource:
    def __init__(self):
        # some settings on self
        # particularly:
        # self.loop = asyncio.get_event_loop()
        # self.item_names_to_process = some_df  # pd.DataFrame of one column named 'name'
        # self.unprocessed_item_names = []
        # self.result = pd.DataFrame()
        # super().__init__()

    def get_item_fetching_function(item_type: ItemType):
        if item_type == ItemType.One:
           return async_rest_api.get_item_type_one
        if item_type == ItemType.Two:
           return async_rest_api.get_item_type_two
        if item_type == ItemType.Three:
           return async_rest_api.get_item_type_three

    # args were of length greater than one, 
    # here I simplified it to just item_type and item_names.    
    async def get_items(self, item_type, item_names):  
        step_size = 500
        results = []

        for i in range(0, len(item_names), step_size):
            tasks = []
            for name in item_names[i:i+step_size]:
                tasks.append(self.get_item_fetching_function(item_type)(name))
            results.extend(await asyncio.gather(*tasks, return_exception=True))


        result = pd.concat([r[1] for r in results])
        processed_items_names = pd.DataFrame(results.name.unique(), columns=['name'])
        unprocessed = self.item_names_to_process.append(processed_items_names).drop_duplicates(keep=False)
        self.unprocessed = unprocessed.name.tolist()
        self.result = pd.concat([self.result, result])
        
        # Trial: I wrote another line here to add 
        # a coroutine to process the unprocessed items. 
        # I tried both in-thread and cross-thread, 
        # neither have worked and no error was thrown; 
        # the code just finished silently.

        # in the running thread

        asyncio.ensure_future(self.get_items(item_type, self.unprocessed), loop=self.loop)

        # or in another thread
   
        asyncio.run_coroutine_threadsafe(self.get_items(item_type, self.unprocessed), self.loop)


    def wrapper_function(self, item_type):
        self.loop.run_until_complete(self.get_items(item_type, self.item_names_to_process))
    
    if __names__ == "__main__":
         import DataSource
         import ItemType
         import asyncio
         data_source = DataSource()
         names = ''.join(random.choices(string.ascii_uppercase + string.digits, k=1000))
         data_source.wrapper_function(ItemType.One)
         # Trial: the last resort would be to check on the `self.unprocessed` here, if it has values, I'll re-run `self.get_items`. This method worked, as expected.
         if len(data_source.unprocessed):
            asyncio.run(data_source.get_items(ItemType.One, data_source.unprocessed))
         

【问题讨论】:

    标签: python multithreading async-await concurrency python-asyncio


    【解决方案1】:

    只需将asyncio.gather 替换为您可以更好地控制的构造。

    它是这样的:当你调用一个协程函数时,你会得到一个协程对象。这个对象可以

    • (1) 直接等待:您的代码暂停并以线性方式等待其完成:因此这不适合并行启动任务。
    • (2) 被提升为“任务”,它们,每当您的代码屈服于运行循环时,它将单步执行所有现有任务,然后再返回您的代码。

    asyncio.gather 的作用在于它做了两件事:将所有尚未成为任务的对象提升为任务,并且当gather等待调用本身。

    您可以保持相同的设计,只需将行 results.extend(await asyncio.gather(*tasks, return_exception=True)) 移动到 i 上的循环内(并在 name 上的循环下方):这将等待一整批“step_size”的任务“大小,直到它们结束—— 或者,您可以明确地创建任务,并使用asyncio.wait (https://docs.python.org/3/library/asyncio-task.html#asyncio.wait) 在解析中检索结果,控制在添加下一批任务之前暂停多少。 (这将需要一些巧妙的重新设计 - 只需为每个 setp_size 批次调用 gather 会容易得多)

    【讨论】:

    • 感谢您的帮助。为了确保我正确理解您,我们基本上重新确定任务的范围并让它们首先完成(一旦我将该行移动到 i 循环内并低于 name 循环),一旦它们完成,我可以询问循环例如,通过使用ensure_future 添加协程来再完成一项任务(在我对从一批检索到的数据进行必要的后期处理之后)。这样我就不需要在课堂外启动另一个循环了。
    • 非常抱歉,在查看我的代码时,我发现自己在示例中放错了 asyncio.gather 语句。我刚刚更新了代码,我实际所做的确实是我在上面的评论中描述的。就像你说的,该语句已经在 i 循环和名称循环下。你让我注意到我没有await我的ensure_future,这正是什么都没发生的原因!非常感谢。
    • 关于async.wait 我发现我可能可以使用return_when=FIRST_EXCEPTION 来控制我想暂停多少并检查异常。准备好后,我可以使用引发异常的项目恢复数据检索。
    猜你喜欢
    • 2017-01-05
    • 1970-01-01
    • 2015-03-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-02-19
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多