【问题标题】:airflow TriggerDagRunOperator how to change the execution date气流TriggerDagRunOperator如何更改执行日期
【发布时间】:2018-05-28 06:11:00
【问题描述】:

我注意到对于计划任务,执行日期是根据过去设置的

Airflow 是为满足 ETL 需求而开发的。在 ETL 世界中, 您通常会汇总数据。所以,如果我想总结数据 2016 年 2 月 19 日,我会在格林威治标准时间 2016 年 2 月 20 日午夜进行,这将是 在 2016-02-19 的所有数据可用之后。

但是,当一个 dag 触发另一个 dag 时,执行时间设置为 now()。

有没有办法让触发的 dag 具有与触发 dag 相同的执行时间?当然,我可以重写模板并使用昨天_ds,但是,这是一个棘手的解决方案。

【问题讨论】:

    标签: triggers airflow


    【解决方案1】:

    以下类在TriggerDagRunOperator 上扩展,允许将执行日期作为字符串传递,然后再转换回日期时间。这有点 hacky,但这是我发现完成工作的唯一方法。

    from datetime import datetime
    import logging
    
    from airflow import settings
    from airflow.utils.state import State
    from airflow.models import DagBag
    from airflow.operators.dagrun_operator import TriggerDagRunOperator, DagRunOrder
    
    class MMTTriggerDagRunOperator(TriggerDagRunOperator):
        """
        MMT-patched for passing explicit execution date
        (otherwise it's hard to hook the datetime.now() date).
        Use when you want to explicity set the execution date on the target DAG
        from the controller DAG.
    
        Adapted from Paul Elliot's solution on airflow-dev mailing list archives:
        http://mail-archives.apache.org/mod_mbox/airflow-dev/201711.mbox/%3cCAJuWvXgLfipPmMhkbf63puPGfi_ezj8vHYWoSHpBXysXhF_oZQ@mail.gmail.com%3e
    
        Parameters
        ------------------
        execution_date: str
            the custom execution date (jinja'd)
    
        Usage Example:
        -------------------
        my_dag_trigger_operator = MMTTriggerDagRunOperator(
            execution_date="{{execution_date}}"
            task_id='my_dag_trigger_operator',
            trigger_dag_id='my_target_dag_id',
            python_callable=lambda: random.getrandbits(1),
            params={},
            dag=my_controller_dag
        )
        """
        template_fields = ('execution_date',)
    
        def __init__(
            self, trigger_dag_id, python_callable, execution_date,
            *args, **kwargs
            ):
            self.execution_date = execution_date
            super(MMTTriggerDagRunOperator, self).__init__(
                trigger_dag_id=trigger_dag_id, python_callable=python_callable,
               *args, **kwargs
           )
    
        def execute(self, context):
            run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
            dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
            dro = self.python_callable(context, dro)
            if dro:
                session = settings.Session()
                dbag = DagBag(settings.DAGS_FOLDER)
                trigger_dag = dbag.get_dag(self.trigger_dag_id)
                dr = trigger_dag.create_dagrun(
                    run_id=dro.run_id,
                    state=State.RUNNING,
                    execution_date=self.execution_date,
                    conf=dro.payload,
                    external_trigger=True)
                logging.info("Creating DagRun {}".format(dr))
                session.add(dr)
                session.commit()
                session.close()
            else:
                logging.info("Criteria not met, moving on")
    

    在使用它而不设置 execution_date=now() 时可能会遇到一个问题:如果您尝试使用相同的 execution_date 两次启动 dag,您的操作员将引发 mysql 错误。这是因为execution_datedag_id用于创建行索引,不能插入具有相同索引的行。

    无论如何,我想不出你会想在生产中使用相同的execution_date 运行两个相同的 dag 的原因,但这是我在测试时遇到的问题,你不应该对此感到惊慌。只需清除旧作业或使用其他日期时间即可。

    【讨论】:

    • 这是一个很好的解决方案,远不是最好的,它仍然有帮助(很多)。
    • 当前 (v1.9.0) 气流的索引位于 (dag_id, run_id)... 您对早期气流版本的 sql 错误有何评论?
    • 我正在运行 v1.8.0,所以你可能是对的,它在 1.9+ 中不是问题
    • 我正在运行气流 1.9.0 并且我有同样的错误。 sqlalchemy.exc.IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint "dag_run_dag_id_execution_date_keyKey (dag_id, execution_date) already exists.
    【解决方案2】:

    TriggerDagRunOperator 现在有一个execution_date 参数来设置触发运行的执行日期。 不幸的是,该参数不在模板字段中。 如果它将被添加到模板字段中(或者如果您覆盖运算符并更改 template_fields 值),则可以像这样使用它:

    my_trigger_task= TriggerDagRunOperator(task_id='my_trigger_task',
                                                  trigger_dag_id="triggered_dag_id",
                                                  python_callable=conditionally_trigger,
                                                  execution_date= '{{execution_date}}',
                                                  dag=dag)
    

    它尚未发布,但您可以在此处查看来源: https://github.com/apache/incubator-airflow/blob/master/airflow/operators/dagrun_operator.py

    进行更改的提交是: https://github.com/apache/incubator-airflow/commit/089c996fbd9ecb0014dbefedff232e8699ce6283#diff-41f9029188bd5e500dec9804fed26fb4

    【讨论】:

      【解决方案3】:

      我对 MMTTriggerDagRunOperator 做了一些改进。该函数检查 dag_run 是否已存在,如果找到,则使用气流的 clear 功能重新启动 dag。这使我们能够在 dag 之间创建依赖关系,因为将执行日期移动到触发的 dag 的可能性打开了整个宇宙的惊人可能性。我想知道为什么这不是气流的默认行为。

         def execute(self, context):
              run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
              dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
              dro = self.python_callable(context, dro)
              if dro:
                  session = settings.Session()
                  dbag = DagBag(settings.DAGS_FOLDER)
                  trigger_dag = dbag.get_dag(self.trigger_dag_id)
      
                  if not trigger_dag.get_dagrun( self.execution_date ):
                      dr = trigger_dag.create_dagrun(
                             run_id=dro.run_id,
                             state=State.RUNNING,
                             execution_date=self.execution_date,
                             conf=dro.payload,
                             external_trigger=True
                      )
                      logging.info("Creating DagRun {}".format(dr))
                      session.add(dr)
                      session.commit()
                  else:
                      trigger_dag.clear( 
                          start_date = self.execution_date,
                          end_date = self.execution_date,
                          only_failed = False,
                          only_running = False,
                          confirm_prompt = False, 
                          reset_dag_runs = True, 
                          include_subdags= False,
                          dry_run = False 
                      )
                      logging.info("Cleared DagRun {}".format(trigger_dag))
      
                  session.close()
              else:
                  logging.info("Criteria not met, moving on")
      

      【讨论】:

        【解决方案4】:

        气流的实验性 API 部分中有一个可用的函数,可让您触发具有特定执行日期的 dag。
        https://github.com/apache/incubator-airflow/blob/master/airflow/api/common/experimental/trigger_dag.py

        您可以将此函数作为 PythonOperator 的一部分调用并实现目标。

        所以它看起来像
        from airflow.api.common.experimental.trigger_dag import trigger_dag

        trigger_operator=PythonOperator(task_id='YOUR_TASK_ID',
                                        python_callable=trigger_dag,
                                        op_args=['dag_id'],
                                        op_kwargs={'execution_date': datetime.now()})
        

        【讨论】:

          猜你喜欢
          • 2021-07-18
          • 2021-01-03
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2016-08-12
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多