【问题标题】:Use PythonOperator with mounted PersistentVolumeClaim将 PythonOperator 与已安装的 PersistentVolumeClaim 一起使用
【发布时间】:2021-08-06 19:41:59
【问题描述】:

我有一个带有 2 个运算符(一个 PythonOperator 和一个 KubernetesPodOperator)的简单 Airflow DAG:

with DAG(dag_id="dummy", start_date=datetime(2020, 11, 7), catchup=False) as dag:
    logger = logging.getLogger("airflow.task")

    volume_mount = k8s.v1_volume_mount.V1VolumeMount(name='osm-config',
                                                     mount_path=ROOT_PATH,
                                                     sub_path=None,
                                                     read_only=False)

    pvc = k8s.V1PersistentVolumeClaimVolumeSource(claim_name="osm-config-pv-claim")

    volume = k8s.v1_volume.V1Volume(name="osm-config",
                                    persistent_volume_claim=pvc)

    def do_it():
        logger.debug("do work")


    start = DummyOperator(task_id="start", dag=dag)

    test = PythonOperator(task_id="test",
                          python_callable=do_it,
                          executor_config={
                              "pod_override": k8s.V1Pod(
                                  spec=k8s.V1PodSpec(
                                      containers=[
                                          k8s.V1Container(
                                              name="base",
                                              volume_mounts=[volume_mount]
                                          )
                                      ],
                                      volumes=[volume],
                                  )
                              )
                          },
                          dag=dag)

    download_data = KubernetesPodOperator(task_id="download_data",
                                          namespace="default",
                                          name="openmaptiles_download_data",
                                          image="openmaptiles/openmaptiles-tools",
                                          cmds=["download-osm"],
                                          volumes=[volume],
                                          volume_mounts=[volume_mount],
                                          dag=dag)


    start >> download_data >> test

目标是让两个运营商都使用 1 个持久卷。 k8s 算子按预期获取挂载的值,并根据需要下载所有内容。但是,PythonOperator 将永远保持queued 状态。

跟踪调度程序 pod 显示以下错误:

版本“v1”中的 Pod 不能作为 Pod 处理:v1.Pod.Spec:v1.PodSpec.Containers:[]v1.Container:v1.Container.VolumeMounts:[]v1.VolumeMount:readObjectStart:expect { or n, but found ", error found in #10 byte of ...|-data"}

我怀疑这是由于未正确设置卷/卷安装,因为格式看起来不正确:

...

 "volumeMounts": [ 
   { 
     "mountPath": "/opt/airflow/dags", 
     "name": "dags-data" 
   }, 
   { 
     "mountPath": "/opt/airflow/logs", 
     "name": "logs-data" 
   }, 
   "{'mount_path': '/osm_config',\n 'mount_propagation': None,\n 'name': 'test',\n 'read_only': False,\n 'sub_path': None,\n 'sub_path_expr': None}" 
 ] 

但我的配置似乎与Airflow documentation一致

【问题讨论】:

    标签: kubernetes airflow-scheduler airflow


    【解决方案1】:

    问题在于传递给 PythonOperator 的 Volume 的类型。

    我最初的示例使用了k8s.v1_volume.V1Volumek8s.v1_volume_mount.V1VolumeMount,但改用k8s.V1Volumek8s.V1VolumeMount 创建了一个按预期安装卷的pod。

    【讨论】:

      猜你喜欢
      • 2014-04-12
      • 1970-01-01
      • 1970-01-01
      • 2020-03-22
      • 1970-01-01
      • 2012-09-13
      • 2016-11-05
      • 1970-01-01
      • 2011-04-08
      相关资源
      最近更新 更多