【问题标题】:Airflow SparkKubernetesOperator logging气流 SparkKubernetesOperator 日志记录
【发布时间】:2022-12-15 01:59:04
【问题描述】:

我在 Airflow 中使用 KubernetesExecutor 作为执行器。 我的 DAG 代码

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor


dag = DAG(
   'spark_pi_using_spark_operator',
   default_args={'max_active_runs': 1},
   description='submit spark-pi as sparkApplication on kubernetes',
   schedule_interval=timedelta(days=1),
   start_date=datetime(2021, 1, 1),
   catchup=False,
)

t1 = SparkKubernetesOperator(
   task_id='spark_pi_submit',
   namespace="default",
   application_file="example_spark_kubernetes_spark_pi.yaml",
   do_xcom_push=True,
   dag=dag,
)

t2 = SparkKubernetesSensor(
   task_id='spark_pi_monitor',
   namespace="default",
   application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",
   dag=dag,
)
t1 >> t2 

DAG 执行成功。我可以通过执行 kubectl logs spark-pi-driver 在 spark-driver 日志中看到输出

但是我无法在 Airflow UI 中看到相同的日志。

【问题讨论】:

  • 你为什么要见他们?该进程在远程机器上执行,Airflow 日志将仅显示进程报告的内容。如果您想收集日志并将它们转储到任务日志 - 您将需要编写此功能。

标签: apache-spark airflow google-spark-operator


【解决方案1】:

如下更新 SparkKubernetesSensor 配置

t2 = SparkKubernetesSensor(
   task_id='spark_pi_monitor',

   namespace="default",

   application_name="{task_instance.xcom_pull(task_ids='spark_pi_submit') 
                      ['metadata']['name'] }}",

   dag=dag,

   attach_log=True,
)

【讨论】:

    猜你喜欢
    • 2019-02-21
    • 2020-05-08
    • 2017-02-28
    • 1970-01-01
    • 2021-04-17
    • 2019-05-18
    • 1970-01-01
    • 1970-01-01
    • 2022-10-11
    相关资源
    最近更新 更多