【问题标题】:How to trigger daily DAG run at midnight local time instead of midnight UTC time如何在当地时间午夜而不是午夜 UTC 时间触发每日 DAG 运行
【发布时间】:2017-11-04 15:24:55
【问题描述】:

我在 UTC+4 时区,所以当 Airflow 触发夜间 ETL 时,这里已经是凌晨 4:00。如何告诉 Airflow 在 ds-1 天 20:00 触发第 ds 天的运行,但使用 ds=ds?

根据文档,强烈建议将所有服务器保持在 UTC,这就是我寻找应用程序级解决方案的原因。

编辑:一个 hacky 解决方案是将其定义为每天晚上 20:00 运行,因此是“前一天”,然后在工作中使用 tomorrow_ds 而不是 ds。但这在 Airflow UI 上看起来仍然很奇怪,因为它会显示 UTC 执行时间。

【问题讨论】:

    标签: airflow apache-airflow airflow-scheduler


    【解决方案1】:

    调度间隔也可以是“cron 表达式”,这意味着您可以在 20:00 UTC 轻松运行它。再加上“user_defined_filters”意味着您可以通过一些技巧获得您想要的行为:

    from airflow.models import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime
    
    import pytz
    tz = pytz.timezone('Asia/Dubai')
    
    
    def localize_utc_tz(d):
        return tz.fromutc(d)
    
    default_args = {
        'start_date': datetime(2017, 11, 8),
    }
    dag = DAG(
        'plus_4_utc',
        default_args=default_args,
        schedule_interval='0 20 * * *',
        user_defined_filters={
            'localtz': localize_utc_tz,
        },
    )
    task = BashOperator(
            task_id='task_for_testing_file_log_handler',
            dag=dag,
            bash_command='echo UTC {{ ts }}, Local {{ execution_date | localtz }} next {{ next_execution_date | localtz }}',
    )
    

    这个输出:

    UTC 2017-11-08T20:00:00,本地 2017-11-09 00:00:00+04:00 下一个 2017-11-10 00:00:00+04:00

    您必须小心使用的变量的“类型”。例如 dsts 是字符串,而不是日期时间对象,这意味着过滤器不会对它们起作用

    【讨论】:

    • 遗憾的是,这对于 Airflow UI 中显示的时间并没有多大作用:(
    • Airflow 团队的核心开发人员已开始着手解决此问题,这意味着您将能够做到start_date = datetime(2017, 1, 1, tzinfo=“Europe/Amsterdam”)。请参阅github.com/apache/incubator-airflow/pull/2781 以跟踪此情况。它可能在 Airflow 1.10 中
    • Airflow 1.10 已经发布,但实际上它并没有帮助,因为模板中的 execution_date 保留在 UTC 中,除非您自己转换它...airflow.apache.org/timezone.html#templates 这是您正在回答的问题。真的不知道为什么他们引入了时区感知 DAG 但只用于触发 DAG,其余的仍然像以前一样工作。
    • 是的,UI 显示不正确(因为 Airflow 的核心在 UTC 中运行)issues.apache.org/jira/browse/AIRFLOW-2805 是更新 UI 中显示的时区的票
    • 这对我有用:tz = pendulum.timezone("Asia/Tehran") def local_ds(d): return tz.fromutc(d).date()
    【解决方案2】:

    我也遇到了同样的问题。我有每天、每小时、半小时的工作。

    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedelta
    import pendulum
    
    local_tz = pendulum.timezone("Asia/Calcutta")
    
    args = {
        'owner': 'ganesh',
        'depends_on_past': False,
        'start_date': datetime(2020, 3, 25, tzinfo=local_tz),
        'email': ['abcd@test.com'],
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 0,
        'retry_delay': timedelta(minutes=5),
    }
    
    dag = DAG(
        dag_id='test1',
        default_args=args,
        schedule_interval='30 00 * * *'
        )
    
    first_date = BashOperator(
        task_id='first_date'
        ,
        bash_command='date'
        , dag=dag, env=None, output_encoding='utf-8')
    
    second_date = BashOperator(
        task_id='second_date'
        ,
        bash_command='echo date'
        , dag=dag, env=None, output_encoding='utf-8')
    
    first_date >> second_date
    
    
    
    

    【讨论】:

      【解决方案3】:

      您可以编写一个 python 实用程序,将您的基于 tz 的时间表重写为 UTC? https://github.com/bloomberg/tzcron/blob/master/tzcron.py

      编辑:最近的提交使 Airflow Timezone 感知: https://github.com/apache/incubator-airflow/commit/f1ab56cc6ad3b9419af94aaa333661c105185883

      【讨论】:

      • 从 10 月 28 日开始提交使 airlfow 时区感知
      猜你喜欢
      • 1970-01-01
      • 2014-07-06
      • 1970-01-01
      • 1970-01-01
      • 2012-10-03
      • 2015-03-20
      • 2016-01-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多