【问题标题】:Simplify nested asyncio operations for string modification function简化字符串修改函数的嵌套异步操作
【发布时间】:2022-01-17 02:14:03
【问题描述】:

我有一个如下所示的异步代码:

  • 有一个第三方函数对字符串执行一些操作并返回一个修改后的字符串,就这个问题而言,它类似于non_async_func

  • 我有一个 async def async_func_single 函数,它环绕着执行单个操作的 non_async_func

  • 然后是另一个 async def async_func_batch 函数,它嵌套包裹在 async_func_single 周围,为一批数据执行该函数。

代码类型的作品,但我想知道更多关于为什么/如何,我的问题是

  • 是否有必要创建async_func_single 并让async_func_batch 环绕它?

  • 我可以直接在async_func_batch中输入一批数据来调用non_async_func吗?

  • 我有一个 per_chunk 函数可以批量输入数据,是否有任何异步操作/函数可以避免使用预批处理我要发送到 async_func_batch 的数据?

import nest_asyncio
nest_asyncio.apply()

import asyncio
from itertools import zip_longest

from loremipsum import get_sentences

def per_chunk(iterable, n=1, fillvalue=None):
  args = [iter(iterable)] * n
  return zip_longest(*args, fillvalue=fillvalue)

def non_async_func(text):
  return text[::-1]

async def async_func_single(text):
  # Perform some string operation.
  return non_async_func(text)

async def async_func_batch(batch):
  tasks = [async_func_single(text) for text in batch]
  return await asyncio.gather(*tasks)

# Create some random inputs
thousand_texts = get_sentences(1000)

# Loop through 20 sentence at a time.
for batch in per_chunk(thousand_texts, n=20):  
  loop = asyncio.get_event_loop()
  results = loop.run_until_complete(async_func_batch(batch))
  for i, o in zip(thousand_texts, results):
    print(i, o)

【问题讨论】:

    标签: python string parallel-processing python-asyncio coroutine


    【解决方案1】:

    请注意,将您的函数标记为“async def”而不是“def”不会使它们自动异步 - 您可以拥有同步的“async def”函数。异步函数和同步函数的区别在于异步函数定义了等待另一个异步函数或等待异步 IO 操作的位置(使用“await”)。

    还要注意 asyncio 不是魔术 - 它基本上是一个调度程序,根据“等待”的函数/操作是否已完成来调度要运行的异步函数。而且,由于调度程序和异步函数都在一个线程上运行,因此在任何给定时刻,只能运行一个异步函数。

    所以,回到你的代码,你的“async_func_single”函数唯一要做的就是调用一个同步函数,因此,尽管被标记为“async def”,它仍然是一个同步函数。同样的逻辑也适用于“async_func_batch”函数——传递给“asyncio.gather”的“async_func_single”任务都是同步的,所以“asyncio.gather”只是同步运行每个任务(所以它没有提供任何好处通过一个简单的 for 循环等待每个任务),所以“async_func_batch”又是一个同步函数。因为您只是调用同步函数,所以 asyncio 不会为您的程序带来任何好处。

    如果您想要多个同时运行的同步函数,则不要使用异步函数。您需要在并行进程/线程中运行它们:

    import sys
    import itertools
    import concurrent.futures
    
    from loremipsum import get_sentences
    
    executor = concurrent.futures.ProcessPoolExecutor(workers=sys.cpu_count())
    
    def per_chunk(iterable, n=1):
        while True:
            chunk = tuple(itertools.islice(iterable, n))
            if chunk:
                yield chunk
            else:
                break
    
    def non_async_func(text):
        return text[::-1]
    
    def process_batches(batches):
        futures = [
            executor.submit(non_async_func, batch)
            for batch in batches
        ]
        concurrent.futures.wait(futures)    
    
    thousand_texts = get_sentences(1000)
    process_batches(per_chunk(thousand_texts, n=20))
    

    如果您仍想使用异步函数来处理批次,那么 asyncio 提供了围绕并发期货的异步包装器:

    async def process_batches(batches):
        event_loop = asyncio.get_running_loop()
        futures = [
            event_loop.run_in_executor(executor, non_async_func, batch)
            for batch in batches
        ]
        await asyncio.wait(futures)
    
    thousand_texts = get_sentences(1000)
    asyncio.run(process_batches(per_chunk(thousand_texts, n=20)))
    

    但它没有任何优势,除非您有其他可以在等待时运行的异步函数。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2012-10-12
      • 1970-01-01
      • 2014-10-05
      • 2019-10-02
      • 2020-10-25
      • 2013-12-27
      相关资源
      最近更新 更多