【问题标题】:Airflow dependency error when instantiating multiple tasks via a 'for' loop通过“for”循环实例化多个任务时出现气流依赖性错误
【发布时间】:2017-07-21 00:25:48
【问题描述】:

我正在运行这个 DAG。它从dash_workers.py 导入函数(尚未包括在内——这会有帮助吗?)并将这些函数实现为PythonOperator 定义的任务。 我正在使用气流版本 1.8.0

from datetime import datetime, timedelta
import os
import sys

import airflow.models as af_models
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import dash_workers
else:
    print('Define DASH_PREPROC_PATH value in environmental variables')
    sys.exit(1)

default_args = {
  'start_date': datetime(2017, 7, 18),
  'schedule_interval': None
}

DAG = af_models.DAG(
  dag_id='dash_preproc',
  default_args=default_args
)

get_id_creds = PythonOperator(
    task_id='get_id_creds',
    python_callable=dash_workers.get_id_creds, 
    provide_context=True,
    dag=DAG)

with open('/tmp/ids.txt', 'r') as infile:
    ids = infile.read().splitlines()

for uid in ids:
    print('Building transactions for {}'.format(uid))
    upload_transactions = PythonOperator(
        task_id='upload_transactions',
        python_callable=dash_workers.upload_transactions, 
        op_args=[uid],
        dag=DAG)
    upload_transactions.set_upstream(get_id_creds)

这会导致:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 263, in process_file
    m = imp.load_source(mod_name, filepath)
  File   "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/pyth on3.6/imp.py", line 172, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 675, in _load
  File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 205, in _call_with_frames_removed
  File "/Users/aaronpolhamus/airflow/dags/dash_dag.py", line 47, in  <module>
    upload_transactions.set_upstream(get_id_creds)
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 2478, in set_upstream
    self._set_relatives(task_or_task_list, upstream=True)
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 2458, in _set_relatives
    task.append_only_new(task._downstream_task_ids, self.task_id)
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 2419, in append_only_new
    ''.format(**locals()))
airflow.exceptions.AirflowException: Dependency <Task(PythonOperator):   get_rfc_creds>, upload_transactions already registered
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 573, in test
    dag = dag or get_dag(args)
  File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 126, in get_dag
    'parse.'.format(args.dag_id))
airflow.exceptions.AirflowException: dag_id could not be found:    dash_preproc. Either the dag did not exist or it failed to parse.

这里的应用程序是我使用函数get_id_creds 从 SQL 表中提取 ID 列表,然后基于每个 ID 生成详细的数据配置文件。这两个函数在内部都使用MySqlHook,并且我已经在独立的基础上测试了每个函数/任务,以确保它们单独产生预期的行为(它们确实如此)。

错误的症结似乎是airflow.exceptions.AirflowException: Dependency &lt;Task(PythonOperator): get_rfc_creds&gt;, upload_transactions already registered 这一行。这似乎表明在第一次通过循环时任务被“注册”,然后在第二次通过时解析器抱怨它已经完成了该操作。 This example script 让我在这里做的事情看起来很容易:只需将您的下游任务嵌入到 for 循环中。不知道为什么这会失败。

我准备使用 LocalExecutor 进行本地并行处理。我的理解是,如果我能做到这一点,我可以在同一台机器上并行运行多个数据配置文件生成作业。

这个错误来自哪里,我怎样才能让这个脚本工作?

【问题讨论】:

  • 请指定您使用的气流版本!

标签: python python-3.x airflow


【解决方案1】:

与您的问题没有直接关系,但您不需要导入airflow.models,只需执行from airflow.models import DAG 并进行必要的更改。

您指出了一个示例,该示例显示了一个带有PythonOperator 动态生成任务的 DAG,但您似乎不太了解它。

在您的情况下,您必须动态分配任务名称,以便每个新任务都可以注册并显示在网络服务器中。

for idx, uid in enumerate(ids):
    print('Building transactions for {}'.format(uid))
    upload_transactions = PythonOperator(
        task_id='upload_transactions_'+str(idx),
        python_callable=dash_workers.upload_transactions, 
        op_args=[uid],
        dag=DAG)

通过在任务名称中添加当前uid 的索引,每个任务将获得一个唯一的名称。我没有为此使用uid,因为我不知道每个元素在您的列表中是否是唯一的。如果是这样,您可以删除 enumerate() 并使用 uid

我希望这会有所帮助。 干杯!

【讨论】:

  • 这非常有用,谢谢!这几乎肯定是问题所在。当我回到办公室时,我会实施解决方案,如果问题解决了,我会回来选择你的答案。
  • (我还将编辑帖子以包含我正在使用的气流版本)
  • 修复了我的图表,@sdikby,感谢您的帮助。我投票支持您的答案,并包含上面的 Airflow 版本号(1.8.0)。如果你有兴趣I've posted another question here about exchanging data between tasks.
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-11-16
  • 1970-01-01
  • 2016-11-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多