【问题标题】:Pass data between steps in Airflow from EMR job在 EMR 作业中的 Airflow 步骤之间传递数据
【发布时间】:2021-11-11 17:00:35
【问题描述】:

在气流中,我正在 EMR 集群上使用 EmrCreateJobFlowOperator 执行一个 jar。此作业在 S3 上写入一些数据,我想将此 S3 路径传递到气流 dag 中的下一步,因为此路径对于每次运行都是动态的。

我认为 XCom 在这种情况下没有用,因为我的工作是基于 Java 的,它只是作为 EMR 上的 jar 执行。另外,我不想让我的 Java 代码气流具体化,因为我也想单独使用它。

有什么解决方案可以做到这一点?

【问题讨论】:

    标签: amazon-web-services amazon-emr airflow message-passing


    【解决方案1】:

    我认为 XCom 是您唯一的选择。运行 EmrCreateJobFlowOperator 时,需要跟 EmrJobFlowSensor。我猜你可以用你自己的类覆盖传感器并检索写入数据的 s3 路径(我想你可以在某些状态元数据中将它返回给传感器) - 然后你的自定义传感器可以将数据推送到 xcom 和下一个依赖于传感器的操作员)可以从 xcom 读取该 s3 路径。

    【讨论】:

    • 如何以及在何处从 Java 代码(即 S3 路径)写入数据?另外,有什么方法可以在我的 Java 代码中以及在气流 DAG 的下一步中获取 EMR 集群 ID?也许我可以在 S3 路径中使用它作为前缀。
    • 我不知道有关 Java 代码的详细信息。您需要以某种方式在作业结果中传递您想要的数据(我猜这在某种程度上可以添加作为元数据的东西),您可以在作业完成时看到这些数据。
    【解决方案2】:

    对于 S3 路径,您可以使用 xcom 将其传递到下一步,或者将实际名称的散列/前缀版本作为路径,以便之后可以访问。

    用于获取集群 在使用 EmrCreateJobFlowOperator 创建作业时,您可以访问返回对象的outputkey 来获取 jobflow_id。这也是 cluster_id

       job_flow_creator = EmrCreateJobFlowOperator(
        task_id='create_job_flow',
        )
    
       job_sensor = EmrJobFlowSensor(
        task_id='check_job_flow',
        job_flow_id=job_flow_creator.output,
           aws_conn_id='aws_default',
    )
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-07-11
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多