【问题标题】:Airflow 2.0.1: Pod Template Override not working as expected for KubernetesExecutorAirflow 2.0.1:Pod 模板覆盖无法按预期为 KubernetesExecutor 工作
【发布时间】:2021-06-10 22:37:18
【问题描述】:

设置:带有 Kubernetes 1.18 和 Python 3.8 的 Airflow 2.0.1,Kubernetes 客户端:18.17.x

Pod 模板文件:

apiVersion: v1
kind: Pod
metadata:
  name: workerPod

spec:
  containers:
    - args: []
      command: []
      env:
        - name: <Key>
          value: "<value>"
      envFrom: []
      name: base
      image: "<image_name>"
      imagePullSecrets: [name: "<image_pull_secrets>"]
      imagePullPolicy: "Always"
      ports: []
      volumeMounts:
        - mountPath: "<path>"
          name: "<name>"

airflow.cfg 中设置的默认配置如下:

[kubernetes]
pod_template_file = <path to template file>
worker_container_repository = <base-default-image>
worker_container_tag = <tag>
namespace = airflow
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1
multi_namespace_mode = False
in_cluster = True
kube_client_request_args =
delete_option_kwargs =
enable_tcp_keepalive = False
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6
dags_in_image = True
dags_volume_mount_point = <volume-mount-point>
image_pull_secrets = <default-pull-secrets>

问题在于,例如,虽然从 pod_template_file 中正确读取了某些键,但我可以看到所有 env variables 设置正确,imagePullPolicy 也被正确读取(通过覆盖值验证imagePullPolicy: "Always" 来自 imagePullPolicy: "IfNotPresent"),但未正确读取 imagePullSecrets 的密钥。我可以验证这一点,因为从 ecr 存储库中提取图像时出现Base credentials not provided 错误。我已经验证了凭据是正确的,并且我可以在尝试明确创建一个 pod 时创建一个 pod。

即使尝试直接在airflow.cfg 中设置imagePullSecrets,我仍然会遇到同样的错误。

我还尝试使用 V1 api 显式创建 pod 覆盖,如下所示:

start_task = PythonOperator(
            task_id=<start_task_id>, python_callable=<start_task_callabel>, op_args=[<args>], dag=dag,
            executor_config={
                "pod_template_file": "<path_to_template>",
                "pod_override": k8s.V1Pod(
                    spec=k8s.V1PodSpec(
                        containers=[
                            k8s.V1Container(
                                name="base",
                                image="<image_override>",
                                image_pull_policy="<pull_policy>"
                            ),
                        ],
                        image_pull_secrets=[k8s.V1LocalObjectReference('<image_pull_secrets>')],
                    )
                ),
            },
        )

在这种情况下,我可以正确下载要使用的 docker 映像,而不会出现任何身份验证错误。但不幸的是,pod 抛出了一个错误:AttributeError: 'V1Container' object has no attribute '_startup_probe'

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.8/dist-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/utils/cli.py", line 89, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 234, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 120, in _run_task_by_local_task_job
    run_job.run()
  File "/usr/local/lib/python3.8/dist-packages/airflow/jobs/base_job.py", line 237, in run
    self._execute()
  File "/usr/local/lib/python3.8/dist-packages/airflow/jobs/local_task_job.py", line 84, in _execute
    if not self.task_instance.check_and_change_state_before_execution(
  File "/usr/local/lib/python3.8/dist-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1029, in check_and_change_state_before_execution
    session.commit()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 1046, in commit
    self.transaction.commit()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 504, in commit
    self._prepare_impl()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl
    self.session.flush()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2540, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2682, in _flush
    transaction.rollback(_capture_exception=True)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.raise_(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2642, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/unitofwork.py", line 586, in execute
    persistence.save_obj(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 230, in save_obj
    _emit_update_statements(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 885, in _emit_update_statements
    for (
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 626, in _collect_update_commands
    state.manager[propkey].impl.is_equal(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/sql/sqltypes.py", line 1738, in compare_values
    return x == y
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod.py", line 221, in __eq__
    return self.to_dict() == other.to_dict()
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod.py", line 196, in to_dict
    result[attr] = value.to_dict()
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod_spec.py", line 1004, in to_dict
    result[attr] = list(map(
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod_spec.py", line 1005, in <lambda>
    lambda x: x.to_dict() if hasattr(x, "to_dict") else x,
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_container.py", line 660, in to_dict
    value = getattr(self, attr)
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_container.py", line 458, in startup_probe
    return self._startup_probe
AttributeError: 'V1Container' object has no attribute '_startup_probe'

【问题讨论】:

    标签: kubernetes airflow python-3.8 airflow-scheduler kubernetesexecutor


    【解决方案1】:

    我遇到了类似的问题。问题是我们更改了气流容器并升级了新容器中的 Kubernetes 库。新的 Kubernetes 库不一定存在问题,但 Airflow 已经序列化了一些对象(在我们的例子中是 TaskInstance,根据共享的回溯,您的例子似乎也是这种情况)并且它反序列化它并从中生成一个 Python 对象。因此,在您的情况下,它会从它拥有的序列化表单中重新创建一个 V1Container 对象。在您的案例中,新对象是用 Python 构造的,例如 this,它在其初始化程序中设置了一个属性 _startup_probe。但序列化版本没有该属性,因此它似乎是this commit 之前的版本。似乎反序列化不会导致问题,但是每当使用 to_dict 方法时,就会出现问题。在您的情况下,它用于对我进行比较 (eq),因为 repr 使用它是在记录时使用的。

    Airflow Slack 社区向我指出了 this change,它应该可以解决此问题。我还不能对此进行测试,但已经在这里分享了,以防有人点击它。

    【讨论】:

      猜你喜欢
      • 2019-12-10
      • 1970-01-01
      • 2015-05-08
      • 1970-01-01
      • 1970-01-01
      • 2018-07-17
      • 1970-01-01
      • 2020-07-14
      • 1970-01-01
      相关资源
      最近更新 更多