【问题标题】:How to use Chunk Size for kedro.extras.datasets.pandas.SQLTableDataSet in the kedro pipeline?如何在 kedro 管道中使用 kedro.extras.datasets.pandas.SQLTableDataSet 的块大小?
【发布时间】:2021-05-13 15:24:20
【问题描述】:

我正在使用 kedro.extras.datasets.pandas.SQLTableDataSet 并希望使用 pandas 的 chunk_size 参数。但是,在运行管道时,表会被视为生成器,而不是 pd.dataframe()。

您将如何在管道中使用 chunk_size?

我的目录:

table_name:
  type: pandas.SQLTableDataSet
  credentials: redshift
  table_name : rs_table_name
  layer: output
  save_args:
    if_exists: append
    schema: schema.name
    chunk_size: 1000

【问题讨论】:

    标签: kedro


    【解决方案1】:

    查看最新的pandas doc,实际使用的kwargchunksize,而不是chunk_size。请参阅https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html。 由于kedro 仅包装您的save_args 并将它们传递给pd.DataFrame.to_sql,因此这些需要匹配:

    def _save(self, data: pd.DataFrame) -> None:
        try:
            data.to_sql(**self._save_args)
        except ImportError as import_error:
            raise _get_missing_module_error(import_error) from import_error
        except NoSuchModuleError as exc:
            raise _get_sql_alchemy_missing_error() from exc
    

    编辑:一旦您在管道中使用此功能,文档显示带有chunksize 设置的pandas.DataFrame.read_sql 将返回类型Iterator[DataFrame]。这意味着在您的节点函数中,您应该遍历输入(并在适当的情况下进行相应的注释),例如:

    def my_node_func(input_dfs: Iterator[pd.DataFrame], *args):
      for df in input_dfs:
        ...
    

    这适用于最新版本的pandas。但是,我注意到 pandas 正在调整 API,以便 read_csvchunksize 设置从 pandas>=1.2 返回 ContextManager,所以我希望这种更改也会发生在 read_sql 中。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多