【问题标题】:Airflow emrAddStepsOperator unable to execute spark shaded jar气流 emrAddStepsOperator 无法执行火花阴影 jar
【发布时间】:2022-06-11 07:04:52
【问题描述】:

spark app 的 step 类型应该是什么 .. 我面临的问题是 master 类型未设置或无法识别 yarn .. 似乎在使用 emrAddStepsOperator 时将应用程序视为简单的 jar 而不是 spark 提交模式 .. 请查找附加的气流 dag、错误和 emr 屏幕截图

amazon emr cloud console manually adding spark job as a step

After adding spark jar type step rather than custom jar step .. gives option to give spark submit args and main method args separately

step type can be streaming or spark app or custom jar

错误:

线程“main”org.apache.spark.SparkException 中的异常:必须在您的配置中设置主 URL 在 org.apache.spark.SparkContext.(SparkContext.scala:385) 在 org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2574) 在 org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:934) 在 scala.Option.getOrElse(Option.scala:121) 在 org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:928) 在 com.mcf.ExtractcustomerCategoryWiseSummarizedViews$.main(ExtractcustomerCategoryWiseSummarizedViews.scala:13) 在 com.mcf.ExtractcustomerCategoryWiseSummarizedViews.main(ExtractcustomerCategoryWiseSummarizedViews.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 org.apache.hadoop.util.RunJar.run(RunJar.java:323) 在 org.apache.hadoop.util.RunJar.main(RunJar.java:236)

"""

这是 AWS EMR 管道的示例 dag。

从创建集群开始,添加步骤/操作,检查步骤,最后完成 终止集群。 """ 进口时间
从气流.operators.python 导入 PythonOperator
从日期时间导入时间增量

从气流导入 DAG
从airflow.providers.amazon.aws.operators.emr_add_steps 导入EmrAddStepsOperator
从airflow.providers.amazon.aws.operators.emr_create_job_flow 导入EmrCreateJobFlowOperator
从airflow.providers.amazon.aws.operators.emr_terminate_job_flow 导入EmrTerminateJobFlowOperator
从airflow.providers.amazon.aws.sensors.emr_step 导入EmrStepSensor
从airflow.utils.dates 导入days_ago

SPARK_STEPS = [    
    {    
        'Name': 'PerformETL',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',    
            #'MainClass': 'com.sadim.main',    
            'Args': ['spark-submit',    
                    '--deploy-mode',    
                    'cluster',    
                    '--master',    
                    'yarn',    
                    '--class',    
                    'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',     
                    '--mode',    
                'DeltaLoadByDays',    
                '--noOfDaysBehindTodayForDeltaLoad',    
                '1',    
                '--s3InputPath',    
                's3://data-lake/documents/accountscore/categoriseddata/',    
                '--s3OutputPathcustomerCategoryWiseSummarizedViews',    
                's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],    
        },
    }
]
SPARK_STEPS2 = [
    {
        'Name': 'sadim_test3',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 's3://test-data/jars/scalatestnadeem-0.0.1-SNAPSHOT_v2.jar',
            'MainClass': 'com.sadim.scalatestnadeem.Test',
            'Args': ['spark-submit',    
                '--deploy-mode',    
                'client',    
                '--master',    
                'yarn',    
                '--conf',    
                'spark.yarn.submit.waitAppCompletion=true'],    
        },    
    }    
]    
SPARK_STEPS3 = [    
    {    
        'Name': 'sadim_test3',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {    
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT_masteryarnwithoutdependencyandtest.jar',    
            'MainClass': 'com.sadim.TestSadim',    
            'Args': ['spark-submit',     
                '--deploy-mode',    
                'client',     
                '--master',     
                'yarn',    
                '--conf',    
                'spark.yarn.submit.waitAppCompletion=true'],    
        },    
    }    
]    
SPARK_STEPS4 = [    
    {    
        'Name': 'PerformETL',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {    
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',    
            #'MainClass': 'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',    
            'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',    
                'spark-submit',    
                '--deploy-mode',    
                'client',    
                 '--master',    
                 'yarn',                    
                    '--mode',    
                'DeltaLoadByDays',    
                '--noOfDaysBehindTodayForDeltaLoad',    
                '1',    
                '--s3InputPath',    
                's3://data-lake/documents/accountscore/categoriseddata/',    
                '--s3OutputPathcustomerCategoryWiseSummarizedViews',    
                's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],    
        },    
    }    
]    
SPARK_STEPS5 = [    
    {    
        'Name': 'PerformETL',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {    
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',    
            #'MainClass': 'com.sadim.main',    
            'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',    
                    '--mode',    
                'DeltaLoadByDays',    
                '--noOfDaysBehindTodayForDeltaLoad',    
                '1',    
                '--s3InputPath',    
                's3://data-lake/documents/accountscore/categoriseddata/',    
                '--s3OutputPathcustomerCategoryWiseSummarizedViews',    
                's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],    
        },    
    }    
]    
JOB_FLOW_OVERRIDES = {    
    'Name': 'ob_emr_airflow_automation',    
    'ReleaseLabel': 'emr-6.6.0',    
    'LogUri': 's3://test-data/emr_logs/',    
    'Instances': {    
        'InstanceGroups': [    
            {    
                'Name': 'Master node',    
                'Market': 'ON_DEMAND',    
                'InstanceRole': 'MASTER',    
                'InstanceType': 'm5.xlarge',    
                'InstanceCount': 1    
            },    
            {    
                    'Name': "Slave nodes",    
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'CORE',    
                    'InstanceType': 'm5.xlarge',    
                    'InstanceCount': 1    
            }    
        ],    
        'Ec2SubnetId': 'subnet-03129248888a14196',    
        'Ec2KeyName': 'datalake-emr-nodes',    
        'KeepJobFlowAliveWhenNoSteps': True,    
        'TerminationProtected': False    
    },    
    'BootstrapActions': [    
                {    
                        'Name': 'Java11InstallBootstrap',    
                        'ScriptBootstrapAction': {    
                            'Path': 's3://test-data/jars/bootstrap.sh',    
                            'Args': [    
                            ]    
                        }    
                }    
    ],    
    'Configurations': [    
        {    
            "Classification":"spark-defaults",    
                    "Properties":{    
                    "spark.driver.defaultJavaOptions":"-XX:OnOutOfMemoryError='kill -9 %p' -    XX:MaxHeapFreeRatio=70",    
                    "spark.executor.defaultJavaOptions":"-verbose:gc -Xlog:gc*::time -    XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p' -XX:MaxHeapFreeRatio=70 -XX:+IgnoreUnrecognizedVMOptions"
                                }    
        }    
    ],    
    'JobFlowRole': 'DL_EMR_EC2_DefaultRole',    
    'ServiceRole': 'EMR_DefaultRole',    
}    

与 DAG(
dag_id='emr_job_flow_manual_steps_dag_v6',
default_args={
'所有者':'气流',
'depends_on_past':错误,
'电子邮件': ['sadim.nadeem@sadim.com'],
'email_on_failure':错误,
'email_on_retry':错误,
},
dagrun_timeout=timedelta(小时=1),
start_date=days_ago(1), schedule_interval='0 3 * * *', 标签=['示例'], ) 作为 dag:```

# [START howto_operator_emr_manual_steps_tasks]
cluster_creator = EmrCreateJobFlowOperator(
    task_id='create_job_flow',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
)

delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
                                               dag=dag,
                                               python_callable=lambda: time.sleep(400))

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id=cluster_creator.output,
    aws_conn_id='aws_default',
    steps=SPARK_STEPS5,
)

step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id=cluster_creator.output,
    step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
   aws_conn_id='aws_default',
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id=cluster_creator.output,
    aws_conn_id='aws_default',
)

cluster_creator >> step_adder >> step_checker >> cluster_remover

# [END howto_operator_emr_manual_steps_tasks]

# Task dependencies created via `XComArgs`:
#   cluster_creator >> step_checker
#   cluster_creator >> cluster_remover
:

【问题讨论】:

    标签: apache-spark airflow hadoop-yarn amazon-emr distributed-computing


    【解决方案1】:

    问题已解决。我们需要在 emrAddStepsOperator 中使用 command-runner jar 作为 jar 选项,并将您的特定 ETL 作业 jar 在 args 中传递为

    SPARK_STEPS = [
        {
            'Name': 'DetectAndLoadOrphanTransactions',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                #'MainClass': 'com.sadim.main',
                'Args': [ 
                'spark-submit',
                '--deploy-mode',
                    'cluster',
                     '--master',
                     'yarn',
                     '--class',
                        'com.sadim.DetectAndLoadOrphanTransactions',
                     's3://nadeem-test-data/jars/open-0.0.1-SNAPSHOT_yarn_yarndep_maininpom.jar',
                    '--Txnspath',
                    's3://nadeem-test-data/spark_output//CategorizedTxnsDataset//2022_load_v2',
                    '--demographyPath',
                    's3://nadeem-test-data/spark_output//customerDemographics//2022_load_v2'
                    '--outPath',
                    's3://nadeem-test-data/spark_output//orphan_ihub_ids//2022_load_v2'],
            },
        }
    ]
    
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-07-17
      • 1970-01-01
      • 1970-01-01
      • 2020-08-06
      • 1970-01-01
      相关资源
      最近更新 更多