【问题标题】:execution_date in airflow: need to access as a variable气流中的执行日期:需要作为变量访问
【发布时间】:2016-08-12 08:37:29
【问题描述】:

我真的是这个论坛的新手。但我一直在为我们公司玩气流。对不起,如果这个问题听起来很愚蠢。

我正在使用一堆 BashOperators 编写管道。 基本上,对于每个任务,我想简单地使用'curl'调用一个 REST api

这就是我的管道的样子(非常简化的版本):

from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime

datetime_obj = datetime.datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
    'email': ['xxxx@xxx.xxx'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
}


current_datetime = datetime_obj.now(tz=tz.tzlocal())

dag = DAG(
    'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))

curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'


t1 = BashOperator(
    task_id='rest-api-1',
    bash_command=curl_cmd,
    dag=dag)

如果你注意到我在做current_datetime= datetime_obj.now(tz=tz.tzlocal()) 相反,我想要的是 'execution_date'

如何直接使用 'execution_date' 并将其分配给我的 python 文件中的变量?

我遇到了访问 args 的一般问题。 任何帮助将不胜感激。

谢谢

【问题讨论】:

    标签: airflow


    【解决方案1】:

    BashOperatorbash_command 参数是一个模板。您可以使用execution_date 变量将execution_date 作为datetime object 在任何模板中访问。在模板中,您可以使用任何jinja2 方法对其进行操作。

    使用以下作为您的BashOperator bash_command 字符串

    # pass in the first of the current month
    some_command.sh {{ execution_date.replace(day=1) }}
    
    # last day of previous month
    some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}
    

    如果您只想要与执行日期等效的字符串,ds 将返回一个日期戳 (YYYY-MM-DD),ds_nodash 返回相同的不带破折号 (YYYYMMDD) 等。更多关于 macros 的信息可用在Api Docs


    您的最终运算符如下所示:

    command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
    t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)
    

    【讨论】:

    • 这是正确答案。我只是编辑它以显示任务的完整版本,例如t1 = BashOperator( task_id='rest-api-1', bash_command='curl -XPOST "'+hostname+':8000/run?st={{ execution_date }}"', dag=dag)
    • 想用Python3 fstrings 信息更新这个,command =f """...""" 似乎不起作用。要使jinja2 模板正常工作,我认为您不能使用 fstrings
    • 如果我的 dag 每小时运行一次,并且当前我的 dag 的执行日期为 2021-06-03 08:00:00,我使用 {{execution_date}} 访问此日期,它返回正常,但是我现在的问题是当前时间是 09:00:00,那么当我访问它时,我的 dag 的执行日期将发生变化,否则它将始终保持不变。
    【解决方案2】:

    PythonOperator 构造函数采用“provide_context”参数(参见https://pythonhosted.org/airflow/code.html)。如果它是 True,那么它会通过 kwargs 将一些参数传递给 python_callable。 kwargs['execution_date'] 是你想要的,我相信。

    类似这样的:

    def python_method(ds, **kwargs):
        Variable.set('execution_date', kwargs['execution_date'])
        return
    
    doit = PythonOperator(
        task_id='doit',
        provide_context=True,
        python_callable=python_method,
        dag=dag)
    

    我不确定如何使用 BashOperator 进行操作,但您可能会从这个问题开始:https://github.com/airbnb/airflow/issues/775

    【讨论】:

    • 谢谢。使用这种方法,我将有一个任务 t1,它将是 PythonOperator 的一个实例,提供_context = true,这让我可以使用 kwargs['execution_date'],我将在其中设置并返回 current_datetime = 'execution_date'。然后我创建我的任务 t2:BashOperator:我将在其中拉(使用 XCOM)并使用我的变量。所以你看,我必须创建 2 个任务。这有点不性感;)我确信(我希望我是对的)有一种方法可以直接在 python 代码中访问“execution_date”,而无需使用 PythonOperator。但我不知道该怎么做:(
    • 您可以只使用 Python 的本机子进程库从 Python 函数/运算符中运行 bash 命令。 docs.python.org/3/library/subprocess.html
    【解决方案3】:

    我认为您不能使用来自任务实例之外的气流上下文的值来分配变量,它们仅在运行时可用。在气流中加载和执行 dag 时,基本上有 2 个不同的步骤:

    • 首先解释和解析您的 dag 文件。它必须工作和编译,并且任务定义必须正确(没有语法错误或任何东西)。在此步骤中,如果您调用函数来填充某些值,这些函数将无法访问气流上下文(例如执行日期,如果您正在执行一些回填,则更是如此)。

    • 第二步是执行dag。只有在第二步中,气流 (execution_date, ds, etc...) 提供的变量才可用,因为它们与 dag 的执行有关。

    因此您无法使用 Airflow 上下文初始化全局变量,但是,Airflow 为您提供了多种机制来实现相同的效果:

    1. 在您的命令中使用 jinja 模板(它可以在代码中的字符串或文件中,两者都将被处理)。您可以在此处查看可用模板列表:https://airflow.apache.org/macros.html#default-variables。请注意,某些函数也可用,尤其是用于计算天数增量和日期格式。

    2. 使用传递上下文的 PythonOperator(使用 provide_context 参数)。这将允许您使用语法kwargs['<variable_name'] 访问相同的模板。如果需要,您可以从 PythonOperator 返回一个值,该值将存储在 XCOM 变量中,您以后可以在任何模板中使用。访问 XCOM 变量使用以下语法:https://airflow.apache.org/concepts.html#xcoms

    3. 如果您编写自己的运算符,则可以使用字典 context 访问气流变量。

    【讨论】:

    • 技术上有 3 种方法可以做到这一点,正如上面其他问题中指出的那样。使用 jinja 模板,在 python_callable 中使用 kwargs,或者在运算符中使用 context['execution_date']。可能最好完全删除这个答案,或者至少删除大部分。
    • 感谢您的提醒,自从我写了这个答案以来,我学到了很多关于气流的知识,我对其进行了编辑以使其更加正确和准确!
    • 我做了一些小的修改,以使您的第一个摘要陈述与以下 2 点一致。我认为这个答案现在是正确的,尽管您可以添加更多代码示例来加分。
    • 这是最正确的答案 - 问题是“如何直接使用 'execution_date' 并将其分配给我的 python 文件中的变量?” - 鉴于没有引用 python 文件(不使用 python 运算符),(正确的)假设是 DAG 中需要它,而你不能这样做,这个答案说。
    • 我认为这个答案应该被接受。它解释了事情是怎样的以及为什么会这样。
    【解决方案4】:
    def execute(self, context):
        execution_date = context.get("execution_date")
    

    这应该在 Operator 的 execute() 方法中

    【讨论】:

    • 如果您正在构建自定义运算符,这可能是您想要的。
    • 我相信它也可以在pre_execute / post_execute 方法中使用
    【解决方案5】:

    要在 PythonOperator 的可调用函数中打印执行日期,您可以在 Airflow 脚本中使用以下内容,也可以添加 start_timeend_time,如下所示:

    def python_func(**kwargs):
        ts = kwargs["execution_date"]
        end_time = str(ts)
        start_time = str(ts.add(minutes=-30))
    

    我已将日期时间值转换为字符串,因为我需要在 SQL 查询中传递它。我们也可以使用它。

    【讨论】:

      【解决方案6】:

      你可以考虑 SimpleHttpOperator https://airflow.apache.org/_api/airflow/operators/http_operator/index.html#airflow.operators.http_operator.SimpleHttpOperator。发出http请求就是这么简单。您可以通过模板将 execution_date 与端点参数一起传递。

      【讨论】:

        【解决方案7】:

        这是另一种没有上下文的方法。使用 dag 的最后执行时间对于计划的 ETL 作业非常有帮助。例如“下载所有新添加的文件”的 dag。使用 dag 的最后执行日期作为时间过滤器,而不是硬编码 datetime.datetime。

        Airflow Dags 实际上有一个名为 DagRun 的类,可以像这样访问它:dag_runs = DagRun.find(dag_id=dag_id)

        这是获取最近运行的执行时间的简单方法:

        def get_most_recent_dag_run(dag_id):
        dag_runs = DagRun.find(dag_id=dag_id)
        dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
        return dag_runs[1] if len(dag_runs) > 1 else None
        

        然后,在您的 pythonOperator 中,您可以通过调用您在上面创建的函数来动态访问 dag 的最后一次执行:

        last_execution = get_most_recent_dag_run('svb_to_s3')
        

        现在它是一个变量!

        【讨论】:

          猜你喜欢
          • 2021-01-03
          • 2021-07-18
          • 2018-05-28
          • 1970-01-01
          • 2021-10-16
          • 2014-12-13
          • 2021-10-02
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多