【问题标题】:AWS Sagemaker using parquet file for batch transform job?AWS Sagemaker 使用镶木地板文件进行批量转换作业?
【发布时间】:2020-10-06 11:01:57
【问题描述】:

我正在尝试使用 parquet 数据文件运行批量转换推理作业,但找不到任何东西。到处都说批量转换只接受 text/csv 或 json 格式类型。出于测试目的,我确实尝试在 AWS 账户中使用 lambda 函数来调用 parque 数据,但批量转换作业从未成功。出现 ClientError: 400,解析数据时出错。

request = \
        {
            "TransformJobName": batch_job_name,
            "ModelName": model_name,
            "BatchStrategy": "MultiRecord",
            "TransformOutput": {
                "S3OutputPath": batch_output
            },
            "TransformInput": {
                "DataSource": {
                    "S3DataSource": {
                        "S3DataType": "S3Prefix",
                        "S3Uri": batch_input

                    }
                },
                "ContentType": "application/x-parquet", 
                "SplitType": "Line",
                "CompressionType": "None"
            },
            "TransformResources": {
                "InstanceType": "ml.m4.xlarge",
                "InstanceCount": 1
            }
        }
    client.create_transform_job(**request)
    return "Done"

目前我正在尝试使用 parque 数据文件在本地运行 sagemaker 批量转换作业。我有可以在本地终端中运行以“服务”的 docker 映像,并且可以使用“localhost:8080/invocations”中的 REST API 服务 Postman 调用数据,使用“二进制”输入函数上传 parque 数据文件。它工作正常,我可以看到邮递员正文中填充的数据。但是,我无法使用 parque 数据进行批量转换。

有没有人成功使用 parquet 文件通过 sagemaker 批量转换进行转换和预测?

【问题讨论】:

    标签: amazon-web-services transform batch-processing parquet amazon-sagemaker


    【解决方案1】:

    Sagemaker 批量转换似乎不支持 parquet 格式,因此您必须有自己的解决方法才能使用 parquet 数据集。您可以将 parquet 数据集转换为推理端点支持的数据集(例如 text/csv 或 application/json),并在批量转换中使用此转换后的数据集。在火花集群中,您可以通过以下简单操作来做到这一点:

    sqlContext.read.parquet("input/parquet/location/").write.json("output/json/location")
    

    【讨论】:

      【解决方案2】:

      对于 SageMaker 批量转换(或任何服务作业),可以为训练方法提供自定义 input_fn,只要有自定义逻辑来处理它,它就可以使用任何其他类型的输入。我已经成功地将它用于 avro 输入。

      下面的内容应该适用于镶木地板文件:

      def input_fn(serialized_input_data, content_type="application/parquet"):
          logger.info("Deserializing input data.")
          ...
          # Process the serialized input.
          logger.info(f"Input deserialized.")
          return input_data
      

      有关自定义函数的更多信息here。这链接到 scikit-learn Estimator 页面,但我认为所有类型的 SageMaker Estimator 对象都支持它。

      【讨论】:

      • 分块serialized_input_data的逻辑是什么?假设您有一个大于 6MB 的 parquet 文件; SageMaker 如何拆分文件?如果它在 parquet 行组的中间分裂,您是否必须尽可能多地解析并将其余部分保存在字节缓冲区中,直到收到下一个块?
      【解决方案3】:

      有点晚了,但希望这对其他人有帮助。 只是补充一下@Setu Shah 提到的内容,这对我在 Sagemaker 中序列化和反序列化镶木地板文件有用:

      from io import BytesIO
      from typing import BinaryIO
      import pandas as pd
      from botocore.response import StreamingBody
      
      def input_fn(
        serialized_input_data: StreamingBody,
        content_type: str = "application/x-parquet",
      ) -> pd.DataFrame:
        """Deserialize inputs"""
        if content_type == "application/x-parquet":
          data = BytesIO(serialized_input_data)
          df = pd.read_parquet(data)
          return df
        else:
          raise ValueError(
            "Expected `application/x-parquet`."
          )
      
      def output_fn(output: pd.DataFrame, accept: str = "application/x-parquet") -> BinaryIO:
        """Model output handler"""
        if accept == "application/x-parquet":
          buffer = BytesIO()
          output.to_parquet(buffer)
          return buffer.getvalue()
        else:
          raise Exception("Requested unsupported ContentType in Accept: " + accept)
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2018-11-09
        • 2018-09-09
        • 2019-12-22
        • 2019-12-07
        • 2021-07-07
        • 2014-11-25
        • 2018-01-04
        相关资源
        最近更新 更多