【问题标题】:Dagster start pipeline from another pipeline using its outputsDagster 使用其输出从另一个管道启动管道
【发布时间】:2021-06-14 18:22:33
【问题描述】:

我应该如何在管道 A 完成后启动管道 B,并将管道 A 的输出用于管道 B?

以一段代码为起点:

from dagster import InputDefinition, Nothing, OutputDefinition, pipeline, solid

@solid
def pipeline1_task1(context) -> Nothing:
    context.log.info('in pipeline 1 task 1')


@solid(input_defs=[InputDefinition("start", Nothing)],
       output_defs=[OutputDefinition(str, 'some_str')])
def pipeline1_task2(context) -> str:
    context.log.info('in pipeline 1 task 2')
    return 'my cool output'


@pipeline
def pipeline1():
    pipeline1_task2(pipeline1_task1())


@solid(input_defs=[InputDefinition("print_str", str)])
def pipeline2_task1(context, print_str) -> Nothing:
    context.log.info('in pipeline 2 task 1' + print_str)


@solid(input_defs=[InputDefinition("start", Nothing)])
def pipeline2_task2(context) -> Nothing:
    context.log.info('in pipeline 2 task 2')


@pipeline
def pipeline2():
    pipeline2_task2(pipeline2_task1())


if __name__ == '__main__':
    # run pipeline 1
    # store outputs
    # call pipeline 2 using the above outputs

这里我们有三个管道:pipeline1 有两个实体,可能会做我们想做的任何事情并从第二个实体返回输出。 pipeline2 应该使用pipeline1_task2 的输出,最终做另一件工作并打印第一个管道的输出。

我应该如何“连接”两条管道?

【问题讨论】:

    标签: python pipeline dagster


    【解决方案1】:

    让一个管道在另一个管道之后执行的一种方法是通过传感器。在 Dagster 中推荐的方法是使用“资产传感器”。第一个管道中的实体生成 AssetMaterialization,第二个管道中的传感器等待该资产实现。

    这是一个例子:https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#asset-sensors

    【讨论】:

      【解决方案2】:

      玩了一会儿之后,我得出了以下解决方案(我认为不是太优雅,但至少它有效):

      from dagster import (InputDefinition, OutputDefinition,
                           execute_pipeline, pipeline, solid, Nothing, repository)
      
      
      @solid
      def pipeline1_task1(context) -> Nothing:
          context.log.info('in pipeline 1 task 1')
      
      
      @solid(input_defs=[InputDefinition("start", Nothing)],
      output_defs=[OutputDefinition(str, 'some_str')])
      def pipeline1_task2(context) -> str:
          context.log.info('in pipeline 1 task 2')
          return '\n\n\nmy cool output\n\n\n'
      
      
      @pipeline
      def pipeline1():
          pipeline1_task2(pipeline1_task1())
      
      
      @solid(input_defs=[InputDefinition("print_str", str)])
      def pipeline2_task1(context, print_str) -> Nothing:
          context.log.info('in pipeline 2 task 1' + print_str)
      
      
      @solid(input_defs=[InputDefinition("start", Nothing)])
      def pipeline2_task2(context) -> Nothing:
          context.log.info('in pipeline 2 task 2')
      
      
      @pipeline
      def pipeline2():
          pipeline2_task2(pipeline2_task1())
      
      
      @solid
      def run_pipelines(context):
          pout = execute_pipeline(pipeline1)
          some_str = pout.result_for_solid('pipeline1_task2')
          conf = {'solids': {'pipeline2_task1': {'inputs': {'print_str': some_str.output_value('some_str')}}}}
          execute_pipeline(pipeline2, run_config=conf)
      
      @pipeline
      def pipeline3():
          run_pipelines()
      
      
      @repository
      def repo():
          return [pipeline1, pipeline2, pipeline3]
      
      if __name__ == '__main__':
          execute_pipeline(pipeline3)
      

      所以...在这里我定义了pipeline3,而不是在底部条件中做所有事情。管道 3 只有一个实体,它执行 pipeline1 并获取实体 pipeline1_task2 的输出。然后它会创建一个包含该输出的配置some_str,并将此配置传递给第二个管道的execute_pipeline

      在这里,我们还定义了一个 @repository 函数,它是 Dagster 确定所有三个管道是一个整体的一部分所必需的。

      整个事情在dagit 中很好地可视化。尽管每个管道都与其他管道分开显示,但三个管道显示在一个存储库中(如代码中所定义)。

      【讨论】:

        猜你喜欢
        • 2021-03-01
        • 1970-01-01
        • 2020-06-14
        • 1970-01-01
        • 2021-01-13
        • 1970-01-01
        • 2020-10-13
        • 2018-11-05
        • 2022-06-16
        相关资源
        最近更新 更多