【问题标题】:Python: existing solution for branching workflow/pipeline?Python:用于分支工作流/管道的现有解决方案?
【发布时间】:2012-03-09 17:02:53
【问题描述】:

在我的应用程序中,我实现了一个由 5 个不同“处理单元”组成的非常粗略的工作流程。目前的代码结构如下:

def run(self, result_first_step=None, result_second_step=None):

    config = read_workflow_config("config.ini")

    if config.first_step:
        result_first_step = run_process_1()

    if config.second_step and result_first_step is not None:
        result_second_step = run_process_2(result_first_step)
    else:
        raise Exception("Missing required data")

    if config.third_step:
        result_third_step = run_process_3(result_first_step, result_second_step)
    else:
        result_third_step = None

    collect_results(result_first_step, result_second_step, result_third_step)

等等。代码可以工作,但它很难维护并且非常脆弱(处理比这个简化的例子复杂得多)。所以,我一直在考虑采用另一种策略,即通过以下方式制定适当的工作流程:

  • 短路:我可以不给启动过程数据,或者两种不同类型的数据。在后一种情况下,工作流会短路并跳过一些处理
  • 通用对象:所有单元都可以使用类似配置的东西
  • 条件:根据配置,有些位可能会被跳过

是否有可用的 Python 库来执行这些类型的工作流,还是我应该自己开发?我一直在尝试 pyutilib.workflow,但它不能正确支持通用配置对象,除非将其传递给所有工作人员(乏味)。

注意:这是针对库/命令行应用程序的,因此基于 Web 的工作流解决方案不合适。

【问题讨论】:

  • 你试过用谷歌搜索这个问题吗?您的发现有什么问题?
  • 按照你写的方式,你不能run_process_2,除非你已经run_process_1。这是真的吗?
  • 确实,我会调整它以更好地展示我的想法。编辑:更改示例显示如何短路。
  • @Marcin 这不是我第一次在谷歌上搜索这个答案,大多数解决方案要么是过度设计的、基于网络的(不,不),要么不提供我需要的东西。
  • @Einar 如果您单独解释现有解决方案的问题,将会很有帮助。

标签: python workflow pipeline


【解决方案1】:

你可以把 run 方法变成一个生成器;

def run(self)
  result_first_step = run_process_1()
  yield result_first_step
  result_second_step = run_process_2(result_first_step)
  yield result_second_step
  result_third_step = run_process_3(result_first_step, result_second_step)
  collect_results(result_first_step, result_second_step, result_third_step)

【讨论】:

    【解决方案2】:

    Python 中有多种管道方法, 从半页到...
    这是主要思想: 在顶部,将所有步骤定义放在一个字典中;
    然后管道(例如“C A T”)执行步骤 C、A、T。

    class Pipelinesimple:
        """p = Pipelinesimple( funcdict );  p.run( "C A T" ) = C(X) | A | T
    
        funcdict = dict( A = Afunc, B = Bfunc ... Z = Zfunc )
        pipeline = Pipelinesimple( funcdict )
        cat = pipeline.run( "C A T", X )  # C(X) | A | T, i.e. T( A( C(X) ))
        dog = pipeline.run( "D O G", X, **kw )  # D(X, **kw) | O(**kw) | G(**kw)
        """
    
    def __init__( self, funcdict ):
        self.funcdict = funcdict  # funcs or functors of X
    
    def run( self, steps, X, **commonargs ):
        """ steps "C A T" or ["C", "A", "T"]
            all funcs( X, **commonargs )
        """
    
        if isinstance( steps, basestring ):
            steps = steps.split()  # "C A T" -> ["C", "A", "T"]
        for step in steps:
            func = self.funcdict(step)
            # if X is None: ...
            X = func( X, **commonargs )
        return X
    

    接下来,有几种方法可以给出不同的参数 到不同的步骤。

    一种方法是解析多行字符串,例如

    """ C  ca=5  cb=6 ...
        A  aa=1 ...
        T  ...
    """
    

    另一个是获取函数列表/函数名称/参数字典,比如

    pipeline.run( ["C", dict(ca=5, cb=6), lambda ..., "T", dict(ta=3) ])
    

    第三种是拆分参数“A__aa B__ba ...”的方式 sklearn.pipeline.Pipeline。 做。 (这适用于机器学习,但您可以复制管道部分。)

    每一个都有相当明显的优缺点。

    一大群有才华的人可以想出十几个原型 非常快速地解决问题 [管道]。
    但是将十几个减少到两三个需要很长时间。

    无论你采取哪种方式, 提供一种记录运行的所有参数的方法。

    另请参阅:
    FilterPype
    nipype

    【讨论】:

    • 有趣的方法,我将分支我的代码并试一试。
    • @Einar,您将采用 3 种方法中的哪一种?
    猜你喜欢
    • 2021-05-03
    • 1970-01-01
    • 2018-12-07
    • 1970-01-01
    • 1970-01-01
    • 2019-09-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多