【问题标题】:How to Connect to AWS Emr Notebook with Airflow如何使用 Airflow 连接到 AWS Emr Notebook
【发布时间】:2021-12-13 08:10:43
【问题描述】:

我想将我的气流连接到集群上运行的 Emr Notebook,目前我已成功连接到 AWS EMR 集群,但我无法连接到笔记本,请帮忙。

在下面的代码中,我正在将一些文件加载​​到 s3 存储桶中,然后我想在我的集群上执行一些我已经完成的步骤功能,但我还想在无法连接的 emr 集群上运行预制笔记本.请帮忙谢谢

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.hooks.S3_hook import S3Hook
from airflow.operators import PythonOperator
from airflow.contrib.operators.emr_create_job_flow_operator import (
    EmrCreateJobFlowOperator,
)
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator import (
    EmrTerminateJobFlowOperator,
)
# Configurations
BUCKET_NAME = "as*****************"  # replace this with your bucket name
local_data = "./dags/data/movie_review.csv"
s3_data = "data/movie_review.csv"
local_script = "./dags/scripts/spark/random_text_classification.py"
s3_script = "scripts/random_text_classification.py"
s3_clean = "clean_data/"
SPARK_STEPS = [ # Note the params values are supplied to the operator
    {
        "Name": "Move raw data from S3 to HDFS",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "s3-dist-cp",
                "--src=s3://{{ params.BUCKET_NAME }}/data",
                "--dest=/movie",
            ],
        },
    },
    {
        "Name": "Classify movie reviews",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "spark-submit",
                "--deploy-mode",
                "client",
                "s3://{{ params.BUCKET_NAME }}/{{ params.s3_script }}",
            ],
        },
    },
    {
        "Name": "Move clean data from HDFS to S3",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "s3-dist-cp",
                "--src=/output",
                "--dest=s3://{{ params.BUCKET_NAME }}/{{ params.s3_clean }}",
            ],
        },
    },
]

# helper function
def _local_to_s3(filename, key, bucket_name=BUCKET_NAME):
    s3 = S3Hook()
    s3.load_file(filename=filename, bucket_name=bucket_name, replace=True, key=key)
default_args = {
    "owner": "airflow",
    "depends_on_past": True,
    "wait_for_downstream": True,
    "start_date": datetime(2020, 10, 17),
    "email": ["airflow@airflow.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}
dag = DAG(
    "spark_submit_airflow",
    default_args=default_args,
    schedule_interval="0 10 * * *",
    max_active_runs=1,
)

start_data_pipeline = DummyOperator(task_id="start_data_pipeline", dag=dag)

data_to_s3 = PythonOperator(
    dag=dag,
    task_id="data_to_s3",
    python_callable=_local_to_s3,
    op_kwargs={"filename": local_data, "key": s3_data,},
)
script_to_s3 = PythonOperator(
    dag=dag,
    task_id="script_to_s3",
    python_callable=_local_to_s3,
    op_kwargs={"filename": local_script, "key": s3_script,},
)

# Add your steps to the EMR cluster
step_adder = EmrAddStepsOperator(
    task_id="add_steps",
    job_flow_id="j-***********", #cluster id
    aws_conn_id="aws_default",
    steps=SPARK_STEPS,
    params={ # these params are used to fill the paramterized values in SPARK_STEPS json
        "BUCKET_NAME": BUCKET_NAME,
        "s3_data": s3_data,
        "s3_script": s3_script,
        "s3_clean": s3_clean,
    },
    dag=dag,
)
last_step = len(SPARK_STEPS) - 1
# wait for the steps to complete
step_checker = EmrStepSensor(
    task_id="watch_step",
    job_flow_id="j-*************",#cluster ID
    step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')["
    + str(last_step)
    + "] }}",
    aws_conn_id="aws_default",
    dag=dag,
)

end_data_pipeline = DummyOperator(task_id="end_data_pipeline", dag=dag)

start_data_pipeline >> [data_to_s3, script_to_s3]  >> step_adder >> step_checker >> end_data_pipeline


【问题讨论】:

  • 请提供更多详细信息,您面临哪些错误和阻碍

标签: python apache-spark jupyter-notebook airflow amazon-emr


【解决方案1】:

我认为我们目前还没有用于笔记本电脑的 emr 运算符。

为了运行预制的 emr 笔记本,您可以使用 boto3 emr 客户端的方法 start_notebook_execution 提供预制笔记本的路径。

制作一个执行start_notebook_execution 的自定义python 运算符,并在您的管道中使用它。在此自定义 python 运算符中,您将需要一个 clusterID,在您的情况下,它是从 EmrAddStepsOperator (step_adder) 返回的

def start_nb_execution(cluster_id,**context):
    
    emr = boto3.client('emr', region_name=REGION)
    start_nb = emr.start_notebook_execution(
        EditorId="YOUR_NOTEBOOK_ID",
        RelativePath="YOUR_NOTEBOOK_FILE_NAME",
        ExecutionEngine={'Id': cluster_id, 'Type': 'EMR'},
        ServiceRole='EMR_Notebooks_DefaultRole'
    )
    execution_id = start_nb['NotebookExecutionId']
    print("Started an execution: " + execution_id)
    return execution_id

将此函数调用为 PythonOperator

start_nb_execution = PythonOperator(
    task_id='start_nb_execution', 
    python_callable=start_execution,
    provide_context=True,
   op_kwargs={"cluster_id":step_adder},
)

现在您可以将其添加到管道中

start_data_pipeline >> [data_to_s3, script_to_s3]  >> step_adder >> step_checker >> start_nb_execution >> end_data_pipeline

有一个很好的教程here,其中也有笔记本的传感器示例

【讨论】:

  • 非常感谢它的帮助,我们还需要在我们的基本终端中安装 boto3 或者它只是把它捡起来
  • 我很高兴,通过基本终端,您的意思是从哪里运行它?
  • 是的,要么是基本终端,要么是在项目文件夹的 vs 代码终端中。顺便说一句,我收到了对象 emr 的错误,我是否也必须在那里安装 boto3
  • 这是错误'EMR'对象没有属性'start_notebook_execution'
  • 你在用boto3做emr客户端吗?如果是,您可能需要检查 boto3 客户端的版本,请参阅start_notebook_execution here
猜你喜欢
  • 2020-01-26
  • 2016-09-15
  • 2018-02-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-12-16
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多