【问题标题】:Airflow start multiple concurrent generic tasks气流启动多个并发通用任务
【发布时间】:2018-09-04 16:05:44
【问题描述】:

尝试在 Cloud Composer 上同时完成一些任务:

arr = {}
for i in xrange(3):
    print("i: " + str(i))
    command_formatted = command_template.format(str(i))
    create_training_instance = bash_operator.BashOperator(
        task_id='create_training_instance',
        bash_command=command_formatted)
    arr[i] = create_training_instance
    start_training.set_downstream(arr[i])  

得到以下错误:

破碎的 DAG:[/home/airflow/gcs/dags/scale_simple.py] 依赖 , create_training_instance 已经 已注册

【问题讨论】:

    标签: airflow google-cloud-composer


    【解决方案1】:

    task_id 对于单个任务应该始终是唯一的。因此,您可以使用 create_training_instance_{}.format(i) 之类的东西作为 task_id

    【讨论】:

      【解决方案2】:

      您还需要参数化您的任务 ID,例如, task_id='create_training_instance' --> 'create_traiing_instance-{}'.format(i)

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-10-12
        • 2021-01-25
        • 1970-01-01
        • 1970-01-01
        • 2017-01-01
        • 2018-12-10
        相关资源
        最近更新 更多