【问题标题】:Airflow scheduling issues气流调度问题
【发布时间】:2018-07-27 14:39:31
【问题描述】:

像上次我忘记包含 code 示例一样重新发布。

我在尝试设置 Airflow 时遇到了一些问题。我一直在尝试使用一个简单的 SQL 脚本来遵循一个非常基本的 DAG 设置。我遇到了三个问题:

  1. 我能够让基本的DAG 运行(我可以在 GUI 中看到 DAG 以及它在数据库上运行的效果),但似乎 DAG 不尊重schedule_interval 参数。我试图让DAG 每小时运行一次,但无论我做出什么改变,DAG 都会按日运行,这也反映在我从 GUI 中看到的内容中。

    李> 1234563 .我不确定为什么会这样。
  2. 在更新 DAG start_dateschedule_interval 时,我一直在重命名 dag_id,因为这似乎是推荐的做法。这有时会触发回填DAG 执行,但这些回填作业也总是失败。

Airflow 配置是默认配置,除了更改donot_pickle = True。非常感谢任何人在上述三个问题上提供的任何帮助,因为到目前为止它们对我来说似乎难以理解。

气流:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime, timedelta

DEFAULT_ARGS = {
    'description': 'For now, a test DAG',
    'start_date': datetime(2018, 7, 19),
    'schedule_interval': "@hourly",
    'end_date': None,
    'owner': 'airflow', #placeholder
    'retries': 3,
}

dag = DAG(
    dag_id='test',
    default_args = DEFAULT_ARGS,
    )

test_redshift_op = PostgresOperator(
    task_id = 'test_redshift_op',
    sql = 'test.sql',
    postgres_conn_id = 'redshift',
    dag = dag
    )

test_redshift_op

SQL:

CREATE TABLE IF NOT EXISTS test(
  id integer,
  words varchar(255)
)
;

INSERT INTO test(
  id,
  words
)
VALUES
(1, 'test'),
(2, 'finish')
;

【问题讨论】:

  • 您无需转发。请根据需要编辑问题。
  • 2) 我在手动执行 DAG 时所做的每一次尝试都失败了。怎么样?
  • 3) 这些回填的工作也总是失败 怎么办?请包含更多信息,例如堆栈跟踪或您正在获取的特定错误以及日志信息。
  • @PaulCrovella 我已将另一个标记为与此重复,因为这个已进一步发展。

标签: airflow


【解决方案1】:

1) schedule_interval 不是 default_args 的一部分。出于好奇-该代码示例来自哪里? schedule_intervalDAG 定义的一部分:

dag = DAG(
  dag_id='test',
  default_args = DEFAULT_ARGS,
  schedule_interval = '@hourly',
)

对于 2) 和 3),给出了出错的信息,但没有具体说明出错的原因。

【讨论】:

  • 感谢您容忍所有的歧义。当我将参数添加到适当的对象时,上面提到的所有问题实际上都得到了解决。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-12-15
  • 2021-06-09
  • 1970-01-01
相关资源
最近更新 更多