【问题标题】:Sagemaker - batch transform] Internal server error : 500Sagemaker - 批量转换] 内部服务器错误:500
【发布时间】:2021-07-30 00:43:09
【问题描述】:

我正在尝试对 S3 存储桶中的训练数据集进行批量转换。我已经关注了这个链接: https://github.com/aws-samples/quicksight-sagemaker-integration-blog

应用转换的训练数据约为 35 MB。 我收到这些错误:

  1. 从算法收到的错误 HTTP 状态:500
  2. 服务器遇到内部错误,无法完成您的请求。服务器过载或应用程序出错。

后续流程:

1. s3_input_train = sagemaker.TrainingInput(s3_data='s3://{}/{}/rawtrain/'.format(bucket, prefix), content_type='csv')

2. from sagemaker.sklearn.estimator import SKLearn
sagemaker_session = sagemaker.Session()
script_path = 'preprocessing.py'
sklearn_preprocessor = SKLearn(
entry_point=script_path,
role=role,
train_instance_type="ml.c4.xlarge",
framework_version='0.20.0',
py_version = 'py3',
sagemaker_session=sagemaker_session)
sklearn_preprocessor.fit({'train': s3_input_train})

3. transform_train_output_path = 's3://{}/{}/{}/'.format(bucket, prefix, 'transformtrain-train-output')
scikit_learn_inferencee_model = sklearn_preprocessor.create_model(env={'TRANSFORM_MODE': 'feature-transform'})
transformer_train = scikit_learn_inferencee_model.transformer(
instance_count=1,
assemble_with = 'Line',
output_path = transform_train_output_path,
accept = 'text/csv',
strategy = "MultiRecord",
max_payload =40,
instance_type='ml.m4.xlarge')

4. Preprocess training input
transformer_train.transform(s3_input_train.config['DataSource']['S3DataSource']['S3Uri'], 
                            content_type='text/csv',
                            split_type = "Line")
print('Waiting for transform job: ' + transformer_train.latest_transform_job.job_name)
transformer_train.wait()
preprocessed_train_path = transformer_train.output_path + transformer_train.latest_transform_job.job_name

preprocessing.py

from __future__ import print_function

import time
import sys
from io import StringIO
import os
import shutil

import argparse
import csv
import json
import numpy as np
import pandas as pd
import logging

from sklearn.compose import ColumnTransformer
from sklearn.externals import joblib
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import Binarizer, StandardScaler, OneHotEncoder

from sagemaker_containers.beta.framework import (
    content_types, encoders, env, modules, transformer, worker)

# Specifying the column names here.
feature_columns_names = [
    'A',
    'B',
    'C',
    'D',
    'E',
    'F',
    'G',
    'H',
    'I',
    'J',
    'K'
] 

label_column = 'ab'

feature_columns_dtype = {
    'A' :  str,
    'B' :  np.float64,
    'C' :  np.float64,
    'D' :  str,
    "E" :  np.float64,
    'F' :  str,
    'G' :  str,
    'H' :  np.float64,
    'I' :  str,
    'J' :  str,
    'K':  str,
}

label_column_dtype = {'ab': np.int32}  

def merge_two_dicts(x, y):
    z = x.copy()   # start with x's keys and values
    z.update(y)    # modifies z with y's keys and values & returns None
    return z

def _is_inverse_label_transform():
    """Returns True if if it's running in inverse label transform."""
    return os.getenv('TRANSFORM_MODE') == 'inverse-label-transform'

def _is_feature_transform():
    """Returns True if it's running in feature transform mode."""
    return os.getenv('TRANSFORM_MODE') == 'feature-transform'


if __name__ == '__main__':

    parser = argparse.ArgumentParser()

    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])


    args = parser.parse_args()

    input_files = [ os.path.join(args.train, file) for file in os.listdir(args.train) ]
    if len(input_files) == 0:
        raise ValueError(('There are no files in {}.\n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                          'the data specification in S3 was incorrectly specified or the role specified\n' +
                          'does not have permission to access the data.').format(args.train, "train"))

    raw_data = [ pd.read_csv(
        file, 
        header=None,
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)) for file in input_files ]
    concat_data = pd.concat(raw_data)
    
    numeric_features = list([
    'B',
    'C',
    'E',
    'H'
    ])


    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())])

    categorical_features = list(['A','D','F','G','I','J','K'])
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))])

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)],
        remainder="drop")

    preprocessor.fit(concat_data)

    joblib.dump(preprocessor, os.path.join(args.model_dir, "model.joblib"))

    print("saved model!")
    
    
def input_fn(input_data, request_content_type):
    """Parse input data payload
    
    We currently only take csv input. Since we need to process both labelled
    and unlabelled data we first determine whether the label column is present
    by looking at how many columns were provided.
    """
    
    
    content_type = request_content_type.lower(
    ) if request_content_type else "text/csv"
    content_type = content_type.split(";")[0].strip()
    
    
    if isinstance(input_data, str):
        str_buffer = input_data
    else:
        str_buffer = str(input_data,'utf-8')
    

    if _is_feature_transform():
        if content_type == 'text/csv':
            # Read the raw input data as CSV.
            df = pd.read_csv(StringIO(input_data),  header=None)
            if len(df.columns) == len(feature_columns_names) + 1:
                # This is a labelled example, includes the  label
                df.columns = feature_columns_names + [label_column]
            elif len(df.columns) == len(feature_columns_names):
                # This is an unlabelled example.
                df.columns = feature_columns_names
            return df
        else:
            raise ValueError("{} not supported by script!".format(content_type))
    
    
    if _is_inverse_label_transform():
        if (content_type == 'text/csv' or content_type == 'text/csv; charset=utf-8'):
            # Read the raw input data as CSV.
            df = pd.read_csv(StringIO(str_buffer),  header=None)
            if len(df.columns) == len(feature_columns_names) + 1:
            # This is a labelled example, includes the ring label
               df.columns = feature_columns_names + [label_column]
            elif len(df.columns) == len(feature_columns_names):
            # This is an unlabelled example.
               df.columns = feature_columns_names
            return df
        else:
            raise ValueError("{} not supported by script!".format(content_type))
            
            
def output_fn(prediction, accept):
    """Format prediction output
    
    The default accept/content-type between containers for serial inference is JSON.
    We also want to set the ContentType or mimetype as the same value as accept so the next
    container can read the response payload correctly.
    """
    
    accept = 'text/csv'
    if type(prediction) is not np.ndarray:
        prediction=prediction.toarray()
    
   
    if accept == "application/json":
        instances = []
        for row in prediction.tolist():
            instances.append({"features": row})

        json_output = {"instances": instances}

        return worker.Response(json.dumps(json_output), mimetype=accept)
    elif accept == 'text/csv':
        return worker.Response(encoders.encode(prediction, accept), mimetype=accept)
    else:
        raise RuntimeException("{} accept type is not supported by this script.".format(accept))


def predict_fn(input_data, model):
    """Preprocess input data
    
    We implement this because the default predict_fn uses .predict(), but our model is a preprocessor
    so we want to use .transform().

    The output is returned in the following order:
    
        rest of features either one hot encoded or standardized
    """

    
    if _is_feature_transform():
        features = model.transform(input_data)


        if label_column in input_data:
            # Return the label (as the first column) and the set of features.
            return np.insert(features.toarray(), 0, pd.get_dummies(input_data[label_column])['True.'], axis=1)
        else:
            # Return only the set of features
            return features
    
    if _is_inverse_label_transform():
        features = input_data.iloc[:,0]>0.5
        features = features.values
        return features
    

def model_fn(model_dir):
    """Deserialize fitted model
    """
    if _is_feature_transform():
        preprocessor = joblib.load(os.path.join(model_dir, "model.joblib"))
        return preprocessor

请帮忙。

【问题讨论】:

    标签: amazon-sagemaker


    【解决方案1】:

    正如我所见,您指的是一篇相当老的帖子,并且在 Github here 上存在关于输入源和配置的问题。

    我鼓励您在此处查看最新示例,这些示例展示了如何使用 Amazon QuickSight 可视化 Amazon SageMaker 机器学习预测。

    此外,如果问题仍然存在,请使用作业 ARN 向 AWS Support 提出服务请求,以调查内部服务器错误的未来。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-06-11
      • 2019-07-28
      • 2019-01-17
      • 2011-10-17
      • 2010-11-15
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多