【问题标题】:how to get latest execution time of a dag run in airflow如何在气流中获取 dag 运行的最新执行时间
【发布时间】:2021-03-03 20:03:30
【问题描述】:

我尝试了下面的代码,但我仍然遇到问题

from airflow.models DagModel

def get_latest_execution_date(**kwargs):

session = airflow.settings.Session()

f = open("/home/Insurance/InsuranceDagsTimestamp.txt","w+")

try:
    Insurance_last_dag_run = session.query(DagModel)
    for Insdgrun in Insurance_last_dag_run:
        if Insdgrun is None: 
            f.write(Insdgrun.dag_id+",9999-12-31"+"\n")
        else:
            f.write(Insdgrun.dag_id+","+ Insdgrun.execution_date+"\n")
except:
    session.rollback()
finally:
    session.close()

t1 = PythonOperator(
    task_id='records',
    provide_context=True,
    python_callable=get_latest_execution_date,
    dag=dag)

有什么方法可以修复和获取最新的 dag 运行时信息

【问题讨论】:

标签: airflow


【解决方案1】:

有多种方法可以获取 DagRun 的最新执行。一种方法是使用 Airflow DagRun 模型。

from airflow.models import DagRun

def get_most_recent_dag_run(dag_id):
    dag_runs = DagRun.find(dag_id=dag_id)
    dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
    return dag_runs[0] if dag_runs else None


dag_run = get_most_recent_dag_run('fake-dag-id-001')
if dag_run:
    print(f'The most recent DagRun was executed at: {dag_run.execution_date}')

您可以在Airflow Docs located here 中找到有关 DagRun 模型及其属性的更多信息。

【讨论】:

    【解决方案2】:

    PythonOperator op_args 参数已模板化。

    callable 仅将最新的执行日期写入文件,因此您可以通过以下方式实现该功能:

    def store_last_execution_date(execution_date):
        '''Appends latest execution date to a file
        :param execution_date: The last execution date of the DagRun.
        '''
    
        with open("/home/Insurance/InsuranceDagsTimestamp.txt", "w+") as f:
            f.write(execution_date)
    
    
    t1 = PythonOperator(
             task_id="records",
             provide_context=True,
             python_callable=store_last_execution_date,
             op_args=[
                 "{{dag.get_latest_execution_date()}}",
             ],
             dag=dag
         )
    

    【讨论】:

    • 试过同样的sn-p,还是不行!
    • 使用 Airflow >=1.10.6,您可以在模板中使用宏。只需将 "{{dag.get_latest_execution_date()}}" 替换为 "{{ prev_execution_date }}" 注意 - 文本和左大括号之间的两个空格很重要。参考 Airflow 文档:airflow.apache.org/docs/apache-airflow/1.10.6/…
    猜你喜欢
    • 2019-01-26
    • 2017-09-06
    • 1970-01-01
    • 2019-01-08
    • 2018-05-06
    • 2022-11-02
    • 1970-01-01
    • 2021-05-03
    • 1970-01-01
    相关资源
    最近更新 更多