【问题标题】:In Prefect, can a task value be cached for the duration of the flow run?在 Prefect 中,是否可以在流程运行期间缓存任务值?
【发布时间】:2020-10-06 04:53:56
【问题描述】:

我有一个使用.map() 的流程;因此,我“循环”了多个输入,但是有些输入我只需要生成一次,但我注意到我的流程不断重新生成它们。

是否可以在运行期间缓存/检查任务的结果(用于其他任务)?

我的理解是可以像这样缓存一段特定的时间:

import datetime

from prefect import task

@task(cache_for=datetime.timedelta(hours=1))
def some_task():
    ...

但是,如果运行时间少于cache_for 时间,缓存是否仍会为下一次运行保留(如果不是,我猜长时间缓存会起作用)。

【问题讨论】:

    标签: python-3.x caching etl prefect


    【解决方案1】:

    是的,有几种不同的方法可以实现这种类型的缓存:

    使用不同的缓存验证器

    除了配置您的缓存过期时间(如上所述),您还可以选择配置cache validator。在您的情况下,您可以使用输入或参数验证器。

    使用缓存键

    您可以通过在任务上指定cache_key 在任务之间“共享”缓存(在单个流内和跨流):

    @task(cache_for=datetime.timedelta(hours=1), cache_key="my-key")
    def some_task():
        ...
    

    然后,这将通过键而不是任务 ID 查找您的候选 Cached 状态。

    使用基于文件的目标

    最后,越来越流行的设置是使用file-based target for your task。然后,您可以使用 flow_run_id 之类的内容以及为您的任务提供的输入来模板化此目标字符串。每当任务运行时,它首先检查指定目标位置是否存在数据,如果找到,则不会重新运行。例如:

    @task(target="{flow_run_id}/{scheduled_start_time:%Y-%d-%m}/results.bytes")
    def some_task():
        ...
    

    如果满足以下两个条件,则此模板具有重用目标数据的效果:

    • 任务在同一天内重新运行
    • 任务作为同一流运行的一部分重新运行

    然后,您可以跨多个任务(或者在您的情况下,跨所有映射的子任务)共享此模板。

    请注意,如果您愿意,您还可以向 target 模板提供输入和参数。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-02-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多