【问题标题】:How to create nested branches in metaflow?如何在 metaflow 中创建嵌套分支?
【发布时间】:2021-04-30 11:28:32
【问题描述】:

我正在使用metaflow创建如下文本处理管道:-

                                 ___F------
                     ______ D---|          |  
                    |           |___G---|  |__>  
          ____B-----|                   |----->H
         |          |______E_________________> ^
      A -|                                     |
         |____C________________________________|

根据documentationbranch 允许并行计算步骤,用于并行计算 (B, C)、(D, E) 和 (F, G)。最后所有的分支都在 H 处加入。下面是实现这个逻辑的代码:-

from metaflow import FlowSpec, step

class TextProcessing(FlowSpec):

  @step
  def a(self):
    ....

    self.next(self.b, self.c)

  @step
  def c(self):
    result1 = {}

    ....

    self.next(self.join)

  @step
  def b(self):
    ....

    self.next(self.d, self.e)

  @step
  def e(self):
    result2 = []
    .....

    self.next(self.join)

  @step
  def d(self):
    ....

    self.next(self.f, self.g)

  @step
  def f(self):
    result3 = []
    ....

    self.next(self.join)

  @step
  def g(self):
    result4 = []
    .....

    self.next(self.join)


  @step
  def join(self, results):
    data = [results.c.result, results.e.result2, result.f.result3, result.g.result4]
    print(data)

    self.next(self.end)

  @step
  def end(self):
    pass

etl = TextProcessing()

在运行 python main.py run 时,我收到以下错误:-

Metaflow 2.2.10 executing TextProcessing for user:ubuntu
Validating your flow...
    Validity checker found an issue on line 83:
    Step join seems like a join step (it takes an extra input argument) but an incorrect number of steps (c, e, f, g) lead to it. This join was expecting 2 incoming paths, starting from splitted step(s) f, g.

谁能指出我哪里出错了?

【问题讨论】:

    标签: python netflix-metaflow


    【解决方案1】:

    在再次仔细检查docs 之后,我意识到我没有正确处理连接。根据metaflow-2.2.10的文档:-

    注意,可以任意嵌套分支,也就是可以在分支内部进行分支。请记住加入您创建的所有分支。

    这意味着应该加入每个分支。为了连接来自分支的值,metaflow 提供了merge_artifacts 实用函数来帮助传播明确的值。

    由于工作流中存在三个分支,因此添加了三个连接步骤来合并结果。

    以下更改对我有用:-

    from metaflow import FlowSpec, step
    
    class TextProcessing(FlowSpec):
    
      @step
      def a(self):
        ....
    
        self.next(self.b, self.c)
    
      @step
      def c(self):
        result1 = {}
    
        ....
    
        self.next(self.merge_3)
    
      @step
      def b(self):
        ....
    
        self.next(self.d, self.e)
    
      @step
      def e(self):
        result2 = []
        .....
    
        self.next(self.merge_2)
    
      @step
      def d(self):
        ....
    
        self.next(self.f, self.g)
    
      @step
      def f(self):
        result3 = []
        ....
    
        self.next(self.merge_1)
    
      @step
      def g(self):
        result4 = []
        .....
    
        self.next(self.merge_1)
    
      @step
      def merge_1(self, results):
        self.result = {
          'result4' : results.g.result4,
          'result3' : results.f.result3
        }
    
        self.next(self.merge_2)
    
      @step
      def merge_2(self, results):
        self.result = { 'result2' : results.e.result2, **results.merge_1.result }
        self.merge_artifacts(results, include=['result'])
        self.next(self.merge_3)
    
      @step
      def merge_3(self, results):
        self.result = { 'c' : results.c.result1, **results.merge_2.result }
        self.merge_artifacts(results, include=['result'])
        self.next(self.end)
    
      @step
      def end(self):
        print(self.result)
    
    etl = TextProcessing()
    

    【讨论】:

      猜你喜欢
      • 2012-11-25
      • 1970-01-01
      • 2020-08-09
      • 2021-01-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多