【问题标题】:using iterator in prefect task "batching"在完美任务“批处理”中使用迭代器
【发布时间】:2021-07-04 20:31:48
【问题描述】:

我正在使用 prefect 并定义一个 flow 以使用 cosmos db 插入文档。

问题是query_items() 调用是可迭代的,对于大型容器,没有办法将所有条目保存在内存中。

我相信我的问题可以简化为:

  • given an iterator, how can I create batches to be processed (mapped) in a prefect flow?

例子:

def big_iterable_function_i_cannot_change():
    yield from range(1000000) # some large amount of work

@task
def some_prefect_batching_magic(x):
    # magic code here
    pass


with Flow("needs-to-be-batched"):
    some_prefect_batching_magic.map(big_iterable_function_i_cannot_change())

上面的代码,或者类似的东西会给我一个错误:

prefect.FlowRunner | Flow run FAILED: some reference tasks failed.

【问题讨论】:

    标签: python parallel-processing azure-cosmosdb prefect


    【解决方案1】:

    您收到此错误是因为您没有将big_iterable_function_i_cannot_change 定义为taskprefect 实际上并不直接执行 flowflow 用于创建schedule,(用dask 的说法)——然后用于执行流程(据我所知)。 prefect 中的并行化仅在与 dask executor 一起使用时发生。

    这是我对您的flow 的看法。但是,如果您无法将任务装饰器添加到big_iterable_function_i_cannot_changetask 中,请将其包装在任务中。最后 - 不确定您是否可以将生成器传递给映射任务。

    import prefect
    from prefect import Flow, Parameter, task
    
    @task
    def big_iterable_function_i_cannot_change():
        return range(5) # some large amount of work
    
    @task
    def some_prefect_batching_magic(x):
        # magic code here
        pass
    
    
    with Flow("needs-to-be-batched") as flow:
        itter_res = big_iterable_function_i_cannot_change()
        post_process_res = some_prefect_batching_magic.map(itter_res)
    
    flow.visualize()
    state = flow.run()
    
    
    flow.visualize(flow_state=state)
    

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-11-04
    • 2014-06-26
    • 1970-01-01
    • 1970-01-01
    • 2010-09-06
    • 1970-01-01
    • 2011-07-01
    • 1970-01-01
    相关资源
    最近更新 更多