【发布时间】:2021-08-06 19:41:59
【问题描述】:
我有一个带有 2 个运算符(一个 PythonOperator 和一个 KubernetesPodOperator)的简单 Airflow DAG:
with DAG(dag_id="dummy", start_date=datetime(2020, 11, 7), catchup=False) as dag:
logger = logging.getLogger("airflow.task")
volume_mount = k8s.v1_volume_mount.V1VolumeMount(name='osm-config',
mount_path=ROOT_PATH,
sub_path=None,
read_only=False)
pvc = k8s.V1PersistentVolumeClaimVolumeSource(claim_name="osm-config-pv-claim")
volume = k8s.v1_volume.V1Volume(name="osm-config",
persistent_volume_claim=pvc)
def do_it():
logger.debug("do work")
start = DummyOperator(task_id="start", dag=dag)
test = PythonOperator(task_id="test",
python_callable=do_it,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[volume_mount]
)
],
volumes=[volume],
)
)
},
dag=dag)
download_data = KubernetesPodOperator(task_id="download_data",
namespace="default",
name="openmaptiles_download_data",
image="openmaptiles/openmaptiles-tools",
cmds=["download-osm"],
volumes=[volume],
volume_mounts=[volume_mount],
dag=dag)
start >> download_data >> test
目标是让两个运营商都使用 1 个持久卷。 k8s 算子按预期获取挂载的值,并根据需要下载所有内容。但是,PythonOperator 将永远保持queued 状态。
跟踪调度程序 pod 显示以下错误:
版本“v1”中的 Pod 不能作为 Pod 处理:v1.Pod.Spec:v1.PodSpec.Containers:[]v1.Container:v1.Container.VolumeMounts:[]v1.VolumeMount:readObjectStart:expect { or n, but found ", error found in #10 byte of ...|-data"}
我怀疑这是由于未正确设置卷/卷安装,因为格式看起来不正确:
...
"volumeMounts": [
{
"mountPath": "/opt/airflow/dags",
"name": "dags-data"
},
{
"mountPath": "/opt/airflow/logs",
"name": "logs-data"
},
"{'mount_path': '/osm_config',\n 'mount_propagation': None,\n 'name': 'test',\n 'read_only': False,\n 'sub_path': None,\n 'sub_path_expr': None}"
]
但我的配置似乎与Airflow documentation一致
【问题讨论】:
标签: kubernetes airflow-scheduler airflow