【问题标题】:SparkJarProcessor in Sagemaker PipelineSagemaker 管道中的 SparkJarProcessor
【发布时间】:2023-01-16 22:06:51
【问题描述】:

我想在 Sagemaker Pipeline 中运行 SparkJarProcessor。在创建 SparkJarProcessor 实例后,当我只是 run 处理器时,我可以使用 submit_appsubmit_class 参数指定要执行的 jar 和类到 run 方法。例如。,

processor.run(
    submit_app="my.jar",
    submit_class="program.to.run",
    arguments=['--my_arg', "my_arg"],
    configuration=my_config,
    spark_event_logs_s3_uri=log_path
)

如果我想将它作为管道中的一个步骤运行,我可以为 ProcessingStep 提供哪些参数?根据this documentation,您可以在处理器上调用 get_run_args 以“获取在 ProcessingStep 中使用 SparkJarProcessor 时所需的规范化输入、输出和参数“,但是当我这样运行它时,

processor.get_run_args(
    submit_app="my.jar", 
    submit_class="program.to.run",
    arguments=['--my_arg', "my_arg"],
    configuration=my_config,
    spark_event_logs_s3_uri=log_path
)

我的输出如下所示:

RunArgs(code='my.jar', inputs=[<sagemaker.processing.ProcessingInput object at 0x7fc53284a090>], outputs=[<sagemaker.processing.ProcessingOutput object at 0x7fc532845ed0>], arguments=['--my_arg', 'my_arg'])

“program.to.run”不是输出的一部分。那么,假设code是指定jar,那么submit_class的规范化版本是什么?

【问题讨论】:

    标签: amazon-web-services apache-spark amazon-sagemaker


    【解决方案1】:

    当在 SparkJarProcessor 上调用 get_run_argsrun 时,submit_class is used to set a property on the processor itself 这就是您在 get_run_args 输出中看不到它的原因。

    该处理器属性将在管道定义生成期间用于将 ContainerEntrypoint 参数设置为 CreateProcessingJob

    例子:

    run_args = spark_processor.get_run_args(
        submit_app="my.jar",
        submit_class="program.to.run",
        arguments=[]
    )
    
    step_process = ProcessingStep(
        name="SparkJarProcessStep",
        processor=spark_processor,
        inputs=run_args.inputs,
        outputs=run_args.outputs,
        code=run_args.code
    )
    
    pipeline = Pipeline(
        name="myPipeline",
        parameters=[],
        steps=[step_process],
    )
    
    definition = json.loads(pipeline.definition())
    definition
    

    definition的输出:

    ...
    'Steps': [{'Name': 'SparkJarProcessStep',
       'Type': 'Processing',
       'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
          'InstanceCount': 2,
          'VolumeSizeInGB': 30}},
        'AppSpecification': {'ImageUri': '153931337802.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spark-processing:2.4-cpu',
         'ContainerEntrypoint': ['smspark-submit',
          '--class',
          'program.to.run',
          '--local-spark-event-logs-dir',
          '/opt/ml/processing/spark-events/',
          '/opt/ml/processing/input/code/my.jar']},
    ...
    

    【讨论】:

    • 非常感谢!有效。完全有道理。
    【解决方案2】:

    对于更现代的 sagemaker sdk 版本,您可以直接使用 run 方法。例如,对于“2.120.0”sagemaker sdk 版本:

    from sagemaker.workflow.steps import ProcessingStep
    from sagemaker.spark.processing import PySparkProcessor
    from sagemaker.workflow.pipeline_context import PipelineSession
    
    
    session = PipelineSession()
    
    spark_processor = PySparkProcessor(
        base_job_name="sm-spark",
        framework_version="3.1",
        role=role,
        instance_count=2,
        instance_type="ml.m5.xlarge",
        max_runtime_in_seconds=1200,
        sagemaker_session=session,
    )
    
    step_preprocess_data = ProcessingStep(
        name="spark-train-data",
        step_args=spark_processor.run(
            submit_app="./code/preprocess.py",
            arguments=[
                "--s3_input_bucket",
                bucket,
                "--s3_input_key_prefix",
                "user_filestore/marti/test-spark",
                "--s3_output_bucket",
                bucket,
                "--s3_output_key_prefix",
                "user_filestore/marti/test-spark",
            ],
            spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, "user_filestore/marti/test-spark"),
        )
    )
    

    【讨论】:

      猜你喜欢
      • 2023-01-13
      • 2022-07-27
      • 2022-12-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-07-01
      • 2020-01-06
      • 2022-11-19
      相关资源
      最近更新 更多