【发布时间】:2021-07-04 20:31:48
【问题描述】:
我正在使用 prefect 并定义一个 flow 以使用 cosmos db 插入文档。
问题是query_items() 调用是可迭代的,对于大型容器,没有办法将所有条目保存在内存中。
我相信我的问题可以简化为:
given an iterator, how can I create batches to be processed (mapped) in a prefect flow?
例子:
def big_iterable_function_i_cannot_change():
yield from range(1000000) # some large amount of work
@task
def some_prefect_batching_magic(x):
# magic code here
pass
with Flow("needs-to-be-batched"):
some_prefect_batching_magic.map(big_iterable_function_i_cannot_change())
上面的代码,或者类似的东西会给我一个错误:
prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
【问题讨论】:
标签: python parallel-processing azure-cosmosdb prefect