【发布时间】:2022-01-11 04:16:52
【问题描述】:
我已经阅读了一些关于 SO 的类似问题,为了向已经运行的事件循环添加协程,一些答案是针对所提出的问题量身定制的,因此不适用于我的情况;最常见的是asyncio.ensure_future(coro(), loop=my_running_loop),或者对于线程安全版本,它将是asyncio.run_coroutine_threadsafe(coro(), my_running_loop);最后的手段是在当前循环结束后生成另一个循环。
我将首先提出我的问题:
- 为什么前两种方法不起作用?
- 除了第三种方法之外,还有其他方法吗,最好在类中完成,这样我就不必在类固有的循环完成后启动另一个循环?
我的场景的一个简化示例是,有 1000 个项目位于 REST-API 的另一端,我必须将它们全部拉出来进行一些分析。但是 REST-API 取决于网络连接,因此我可能会间歇性地遇到一些项目只是超时错误(或为空);在这种情况下,我必须再次调用 REST-API 来获取那些失败的。
只是为了演示我的场景,没有所有粒度(数据特定而不是代码逻辑):
import asyncio
import ItemType # Enum
import async_rest_api
import pandas as pd
class DataSource:
def __init__(self):
# some settings on self
# particularly:
# self.loop = asyncio.get_event_loop()
# self.item_names_to_process = some_df # pd.DataFrame of one column named 'name'
# self.unprocessed_item_names = []
# self.result = pd.DataFrame()
# super().__init__()
def get_item_fetching_function(item_type: ItemType):
if item_type == ItemType.One:
return async_rest_api.get_item_type_one
if item_type == ItemType.Two:
return async_rest_api.get_item_type_two
if item_type == ItemType.Three:
return async_rest_api.get_item_type_three
# args were of length greater than one,
# here I simplified it to just item_type and item_names.
async def get_items(self, item_type, item_names):
step_size = 500
results = []
for i in range(0, len(item_names), step_size):
tasks = []
for name in item_names[i:i+step_size]:
tasks.append(self.get_item_fetching_function(item_type)(name))
results.extend(await asyncio.gather(*tasks, return_exception=True))
result = pd.concat([r[1] for r in results])
processed_items_names = pd.DataFrame(results.name.unique(), columns=['name'])
unprocessed = self.item_names_to_process.append(processed_items_names).drop_duplicates(keep=False)
self.unprocessed = unprocessed.name.tolist()
self.result = pd.concat([self.result, result])
# Trial: I wrote another line here to add
# a coroutine to process the unprocessed items.
# I tried both in-thread and cross-thread,
# neither have worked and no error was thrown;
# the code just finished silently.
# in the running thread
asyncio.ensure_future(self.get_items(item_type, self.unprocessed), loop=self.loop)
# or in another thread
asyncio.run_coroutine_threadsafe(self.get_items(item_type, self.unprocessed), self.loop)
def wrapper_function(self, item_type):
self.loop.run_until_complete(self.get_items(item_type, self.item_names_to_process))
if __names__ == "__main__":
import DataSource
import ItemType
import asyncio
data_source = DataSource()
names = ''.join(random.choices(string.ascii_uppercase + string.digits, k=1000))
data_source.wrapper_function(ItemType.One)
# Trial: the last resort would be to check on the `self.unprocessed` here, if it has values, I'll re-run `self.get_items`. This method worked, as expected.
if len(data_source.unprocessed):
asyncio.run(data_source.get_items(ItemType.One, data_source.unprocessed))
【问题讨论】:
标签: python multithreading async-await concurrency python-asyncio