【问题标题】:Dagster: Multiple and Conditional Outputs (Type check failed for step output xxx PySparkDataFrame)Dagster:多个和条件输出(步骤输出 xxx PySparkDataFrame 的类型检查失败)
【发布时间】:2020-12-20 19:06:00
【问题描述】:

我正在执行 Dagster 教程,但我卡在了 Multiple and Conditional Outputs 步骤。

solid 定义中,它要求声明(除其他外):

output_defs=[
    OutputDefinition(
        name="hot_cereals", dagster_type=DataFrame, is_required=False
    ),
    OutputDefinition(
        name="cold_cereals", dagster_type=DataFrame, is_required=False
    ),
],

但没有任何信息 DataFrame 来自何处。 首先我尝试了pandas.DataFrame,但我遇到了错误:{dagster_type} is not a valid dagster type。当我尝试通过$ dagit -f multiple_outputs.py 提交它时会发生这种情况。 然后我安装了dagster_pyspark 并尝试了dagster_pyspark.DataFrame。这次我设法将 DAG 推到了 UI 上。但是,当我从 UI 运行它时,出现以下错误:

dagster.core.errors.DagsterTypeCheckDidNotPass: Type check failed for step output hot_cereals of type PySparkDataFrame.
  File "/Users/bambrozio/.local/share/virtualenvs/dagster-tutorial/lib/python3.7/site-packages/dagster/core/execution/plan/execute_plan.py", line 210, in _dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/Users/bambrozio/.local/share/virtualenvs/dagster-tutorial/lib/python3.7/site-packages/dagster/core/execution/plan/execute_step.py", line 273, in core_dagster_event_sequence_for_step
    for evt in _create_step_events_for_output(step_context, user_event):
  File "/Users/bambrozio/.local/share/virtualenvs/dagster-tutorial/lib/python3.7/site-packages/dagster/core/execution/plan/execute_step.py", line 298, in _create_step_events_for_output
    for output_event in _type_checked_step_output_event_sequence(step_context, output):
  File "/Users/bambrozio/.local/share/virtualenvs/dagster-tutorial/lib/python3.7/site-packages/dagster/core/execution/plan/execute_step.py", line 221, in _type_checked_step_output_event_sequence
    dagster_type=step_output.dagster_type,

有人知道怎么解决吗?感谢您的帮助!

【问题讨论】:

    标签: dagster


    【解决方案1】:

    正如 Arthur 所指出的,完整的教程代码可在 Dagster 的 github 上找到。

    但是,您不需要dagster_pandas,而是您的代码中缺少的关键行是:

    if typing.TYPE_CHECKING:
        DataFrame = list
    else:
        DataFrame = PythonObjectDagsterType(list, name="DataFrame")  # type: Any
    

    上述结构的原因是为了实现MyPy合规,见教程的Types & Expectations section

    另请参阅Dagster types 上的文档。

    【讨论】:

      【解决方案2】:

      我也被困在这里,但幸运的是我找到了更新的源代码。 他们更新了文档,以便预先定义 OutputDefinition

      在排序和管道之前更新您的代码,如下所示:

      import csv
      import os
      
      from dagster import (
          Bool,
          Field,
          Output,
          OutputDefinition,
          execute_pipeline,
          pipeline,
          solid,
      )
      
      
      @solid
      def read_csv(context, csv_path):
          lines = []
          csv_path = os.path.join(os.path.dirname(__file__), csv_path)
          with open(csv_path, "r") as fd:
              for row in csv.DictReader(fd):
                  row["calories"] = int(row["calories"])
                  lines.append(row)
      
          context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
          return lines
      
      
      @solid(
          config_schema={
              "process_hot": Field(Bool, is_required=False, default_value=True),
              "process_cold": Field(Bool, is_required=False, default_value=True),
          },
          output_defs=[
              OutputDefinition(name="hot_cereals", is_required=False),
              OutputDefinition(name="cold_cereals", is_required=False),
          ],
      )
      def split_cereals(context, cereals):
          if context.solid_config["process_hot"]:
              hot_cereals = [cereal for cereal in cereals if cereal["type"] == "H"]
              yield Output(hot_cereals, "hot_cereals")
          if context.solid_config["process_cold"]:
              cold_cereals = [cereal for cereal in cereals if cereal["type"] == "C"]
              yield Output(cold_cereals, "cold_cereals")
      

      你也可以从here找到整行代码。

      【讨论】:

        【解决方案3】:

        首先尝试安装 dagster pandas 集成:

        pip install dagster_pandas
        

        然后做:

        from dagster_pandas import DataFrame
        

        你可以从教程here找到代码。

        【讨论】:

          猜你喜欢
          • 2021-10-21
          • 2014-06-08
          • 2018-09-15
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2016-01-20
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多