【问题标题】:Create tasks from lists - Airflow从列表创建任务 - Airflow
【发布时间】:2021-08-09 12:13:25
【问题描述】:

我有一个要求,我有一个类似列表的列表

[[a,b,c,d],[e,f,g,h],[i,j,k,l]]

现在我想在 DAG 中创建如下所示的任务

a >> b >> c >> d

e >> f >> g >> h

i >> j >> k >> l

感谢任何帮助。

【问题讨论】:

  • 分享你的尝试
  • a,b,.. 代表运算符吗?

标签: python list airflow airflow-scheduler directed-acyclic-graphs


【解决方案1】:

您可以使用方便的chain() 函数在 1 行中完成此操作。

from airflow.models.baseoperator import chain

[a,b,c,d,e,f,g,h,i,j,k,l] = [DummyOperator(task_id=f"{i}") for i in "abcdefghijkl"]
chain([a,e,i], [b,f,j], [c,g,k], [d,h,l])

【讨论】:

  • 这看起来很整洁。不错的答案! +1
【解决方案2】:

假设 a,b,c,... 是操作员 - 下面应该完成这项工作(模拟气流操作员)

class Operator:
    def __init__(self, name):
        self.name = name

    def set_downstream(self, other):
        print(f'{self}: setting {other} as downstream')

    def __str__(self) -> str:
        return self.name


a = Operator('a')
b = Operator('b')
c = Operator('c')
d = Operator('d')
e = Operator('e')
f = Operator('f')
lst = [[a, b, c], [e, f, d]]

for oper_lst in lst:
    for i in range(0, len(oper_lst) - 1):
        oper_lst[i].set_downstream(oper_lst[i + 1])

输出

a: setting b as downstream
b: setting c as downstream
e: setting f as downstream
f: setting d as downstream

【讨论】:

    【解决方案3】:
    op_lists = [[a,b,c,d],[e,f,g,h],[i,j,k,l]]
    for op_list in op_lists:
        for i in range(len(op_list) - 1):
            op_list[i] >> op_list[i + 1]
    

    编辑:我没有看到 balderman 的回答。他是第一个

    【讨论】:

      【解决方案4】:

      a,...,l 是节点并且 >> 描述弧,我们可以将嵌套列表编写为字典,然后将此对象用于有向无环图。根据您的数据(例如)嵌套列表和它们之间的连接,您可以调整代码。在上面的示例中,我们将三个列表转换为三个弧。可以像这样将列表对象指向字典:

      graph = [["a","b","c","d"],["e","f","g","h"],["i","j","k","l"]]
      
      dic_list = []
      z = {}
      for i in range(len(graph)):
          b = dict(zip(graph[i][::1], list(graph[i][1::1])))
          dic_list.append(b)
          z = {**z, **dic_list[i]}
      

      然后使用 python 文档中的这个标准代码来构建一个 DAG,如下所示:

         def find_all_paths(graph, start, end, path=[]):
              path = path + [start]
              if start == end:
                  return [path]
              if not graph.has_key(start):
                  return []
              paths = []
              for node in graph[start]:
                  if node not in path:
                      newpaths = find_all_paths(graph, node, end, path)
                      for newpath in newpaths:
                          paths.append(newpath)
              return paths
      

      这能回答你的问题吗?

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2023-02-03
        • 2017-09-26
        • 2022-08-19
        • 1970-01-01
        • 2023-02-20
        • 2021-12-26
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多