【问题标题】:Schedule airflow job bi-weekly每两周安排一次气流作业
【发布时间】:2020-08-24 16:41:57
【问题描述】:

我有一个要求,我想每隔一个星期五安排一次气流作业。但是,问题是我无法弄清楚如何为此编写时间表。

我不想为此有多个工作。

我试过了

'0 0 1-7,15-21 * 5

但是它不起作用,它每天从 1 点到 7 点和 15 点到 21 点运行。

从shubham 的回答中,我意识到我们可以拥有一个可以为我们跳过任务的PythonOperator。我试图实施解决方案。不过好像没用。

因为在 2 周内进行测试太难了。这就是我所做的。

  • 我安排 DAG 每 5 分钟运行一次
  • 但是,我正在编写 python 运算符跳过备用任务(与我正在尝试做的非常相似,交替星期五)。

DAG:

args = {
    'owner': 'Gaurang Shah',
    'retries': 0,
    'start_date':airflow.utils.dates.days_ago(1),
}


dag = DAG(
    dag_id='test_dag',
    default_args=args,
    catchup=False,
    schedule_interval='*/5 * * * *',
    max_active_runs=1
    )


dummy_op = DummyOperator(task_id='dummy', dag=dag)

def _check_date(execution_date, **context):
    min_date = datetime.now() - relativedelta(minutes=10)
    print(context)
    print(context.get("prev_execution_date"))
    print(execution_date)
    print(datetime.now())
    print(min_date)
    if execution_date > min_date:
        raise AirflowSkipException(f"No data available on this execution_date ({execution_date}).")

check_date = PythonOperator(
    task_id="check_if_min_date",
    python_callable=_check_date,
    provide_context=True,
    dag=dag,
)

【问题讨论】:

    标签: airflow airflow-scheduler


    【解决方案1】:
    • 我怀疑单个crontab 表达式can solve this

    • 使用airflow 的技巧,解决方案更简单:

      在这里,您必须让您的 DAG 以一个专用的 skip_decider 任务开始,这将使您的 DAG 每隔一个星期五运行/跳过一次

      • 有条件地提高 AirflowSkipException(跳过 DAG)
      • 不做任何事情让 DAG 运行

    你也可以利用

    但 IMO,AirflowSkipException 是最干净的解决方案


    参考:How to define a DAG that scheduler a monthly job together with a daily job?

    【讨论】:

    • 似乎是一个不错的替代方案。您能否详细说明如何跳过第二次运行?
    • 感谢您的回复。我浏览了帖子,但是我无法理解其中的逻辑。 execution_date > (datetime.datetime.now() - relativedelta(weeks=1)) 不会 exceution_datedateime.datetime.now 相同。我对此进行了测试,两者都相同。
    • @感谢您的帖子。我理解不同。在我的情况下,两者都显示相同,只有 5-10 秒的差异。我创建了一个在 5 分钟触发的 dag。如果持续时间少于 10 分钟,我将跳过任务。但是它不起作用。可能我会在这里打开一个新问题或更新
    【解决方案2】:

    根据您的实现,您可以使用哈希。使用 1.10 版在我的气流时间表中工作:

    哈希 (#) '#' 允许用于星期几字段,并且后面必须跟一个介于 1 到 5 之间的数字。它允许指定构造,例如给定月份的“第二个星期五”。 [19]例如,在星期几字段中输入“5#3”对应于每个月的第三个星期五。 Reference

    【讨论】:

      【解决方案3】:

      您可以使用下面提到的 timedelta,将它与 start_date 结合起来安排您的工作 bi_weekly。

         dag = DAG(
             dag_id='test_dag',
             default_args=args,
             catchup=False,
             start_date=datetime(2021, 3, 26),
             schedule_interval=timedelta(days=14),
             max_active_runs=1
         )
      

      【讨论】:

      • 这不能像每个月的天数一样工作。
      猜你喜欢
      • 2017-10-31
      • 1970-01-01
      • 2022-07-27
      • 1970-01-01
      • 1970-01-01
      • 2015-03-23
      • 2012-12-07
      相关资源
      最近更新 更多