【问题标题】:pod_mutation_hook function not working on airflow running in kubernetes using KubernetesExecutorpod_mutation_hook 函数不适用于使用 KubernetesExecutor 在 kubernetes 中运行的气流
【发布时间】:2020-04-06 18:58:12
【问题描述】:

我正在尝试将在 kubernetes 中运行的气流部署从 CeleryExecutor 迁移到 KubernetesExecutor。在我的本地开发环境中一切都很顺利(在 minikube 上运行),但是我需要在生产中加载一个 sidecar 容器来运行允许我连接到我的 sql 数据库的代理。经过一番谷歌搜索后,似乎在$PYTHONPATH 某处的airflow_local_settings.py 文件中定义pod_mutation_hook 函数是应该如何完成此任务的。

首先,我尝试根据this 示例在配置映射中定义它。例如

apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-config
  namespace: dev
data:
  ...

  AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: "airflow-logs"

  AIRFLOW__KUBERNETES__AIRFLOW_LOCAL_SETTINGS_CONFIGMAP: "airflow-config"
  ...

  airflow_local_settings.py: |
    from airflow.contrib.kubernetes.pod import Pod

    def pod_mutation_hook(pod: Pod):
        extra_labels = {
            "test-label": "True",
        }
        pod.labels.update(extra_labels)

我在airflow.cfg 文件中指定了此配置映射,它被拾取并正常安装,所有其他环境变量都正常工作,但pod_mutation_hook 似乎没有运行,因为没有标签被添加到启动的结果 pod由 kubernetes 执行程序(请注意,此处还指定了日志卷声明,并且可以正常工作)。

接下来,我尝试在 $AIRFLOW_HOME/configs/airflow_local_settings.py 下为工作启动的图像中定义 airflow_local_settings.py 文件,正如评论 here 中所建议的那样。我还从上面的airflow-config configmap 中删除了相关部分。这似乎也对为作业创建的结果 pod 没有影响,因为它也缺少指定的标签。

所以,我现在不确定如何继续,因为我不明白我应该如何指定 airflow_local_settings.py 文件和 pod_mutation_hook 函数,以便它们在运行之前实际改变 pod。任何帮助将不胜感激。谢谢。

【问题讨论】:

    标签: kubernetes airflow minikube kubernetesexecutor


    【解决方案1】:

    总结

    如果您希望 KubernetesExecutor 或 KubernetesPodOperator(具有不同的 Executor)启动的所有 Pod 上的 Sidecar 启动,那么您至少应该将您的 airflow_local_settings.py 文件放在调度程序的 PYTHONPATH 上,因为它们都已启动由调度程序。

    但是,如果您还希望在使用 KubernetesExecutor 时在由 KubernetesPodOperator 启动的 POD 上使用边车,则需要在 airflow.cfg 中设置 airflow_local_settings_configmap(就像在 https://github.com/astronomer/airflow-chart/blob/f3dddeffe43c92d594cfcfe9c5b001680f45a986/templates/configmap.yaml#L72 中所做的那样),就像您将 KubernetesExecutor 与 KubernetesExecutor 一起使用时一样, task pods (with KubernetesPodOperator) 将由worker POD启动。

    请注意我们如何将相同的配置映射传递给调度程序部署 (https://github.com/astronomer/airflow-chart/blob/f3dddeffe43c92d594cfcfe9c5b001680f45a986/templates/scheduler/scheduler-deployment.yaml#L125-L135) 和 airflow.cfg 本身,因为我们希望所有 POD 通过 pod_mutation_hook 进行变异。

    详情

    “airflow.cfg”和“airflow_local_settings.py”文件需要存在于 Scheduler 上(您的 Scheduler 是在 VM 上还是在 POD 上与此无关)。我们还添加了有关在何处输出此文件的文档:https://airflow.apache.org/docs/stable/concepts.html#where-to-put-airflow-local-settings-py

    现在,无论何时使用 KubernetesExecutorKubernetePodOperator,都会使用 pod_mutation_hook。 KubernetesExecutor 或 KubernetesExecutor 启动的 POD 将使用此突变挂钩。

    现在,回到配置图。当您使用KubernetesExecutor 并且有一个使用KuberneretPodOperator 的任务时,您需要airflow.cfgairflow_local_settings.py 文件都存在于KubernetesExecutor 启动的worker pod 上。

    KubernetesExecutor 会为此任务启动一个 Worker Pod。

    Scheduler Pod ---> Worker Pod (Pod_1 -- 由 KubernetesExecuetor 启动) --> (Pod_2 -- 由 Pod_1 任务使用 KubernetesPodOperator)

    现在,airflow.cfg (https://github.com/apache/airflow/blob/master/airflow/config_templates/default_airflow.cfg#L870-L1028) 中的整个 [kubernetes] 部分仅用于 KubernetesExecutor 并影响 KubernetesExecutor 启动的 Worker Pods 上挂载的内容.

    如果您不指定airflow_local_settings configmap,则airflow_local_settings 文件将不会挂载到worker pod(上例中的Pod_1),并且只会挂载airflow.cfg 文件。所以现在对于 Pod_2(由 Pod_1 启动)——(当您将 KubernetesPodOperator 与 KubernetesExecutor 一起使用时的特殊情况),因为 Pod_1(工作 POD)没有 airflow_local_settings.py 文件,即使调度程序有它,Pod_2 也不会发生变异,因为该文件在那边不存在。

    认为它与airflow.cfg 相同——为什么要将airflow.cfg 文件同时挂载到Scheduler POD 和worker POD。同样,对于这种极端情况,您在两个地方都需要 airflow_local_settings.py 文件。

    https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/kubernetes/worker_configuration.py#L279-L305 --> 此代码用于决定在 Worker Pod 上挂载什么 (REF_1)

    https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/executors/kubernetes_executor.py#L462-L481 --> 为 KubernetesExecutor 运行的每个任务创建的 Pod (REF_2) -- 变异应用于这个 POD,因为它是由调度程序启动的,它有 airflow_local_settings.py 文件

    https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py#L383 --> 此代码用于在使用 KubernetesPod Operator (REF_3) 时创建新的 POD -- 由于 airflow_local_settings.py 未安装在 REF_2 中生成的 POD 上,因此发生了突变'未应用于此 POD。

    【讨论】:

      【解决方案2】:

      我遇到了同样的问题,请确保airflow_local_settings 可以从调度程序中导入。您必须将这些更改烘焙到图像中。

      WORKDIR ${AIRFLOW_USER_HOME}
      ENV PYTHONPATH  $PYTHONPATH:$AIRFLOW_HOME/config/
      COPY airflow_local_settings.py $AIRFLOW_HOME/config/airflow_local_settings.py
      

      使用您在上面突出显示的 configmap 将使它们进入执行程序,但此时不需要,因此是一种无用的设置。随意阅读源代码:

      https://github.com/apache/airflow/blob/8465d66f05baeb73dd4479b019515c069444616e/airflow/settings.py

      【讨论】:

      • 链接状态 This could be used, for instance, to add sidecar or init containers to every worker pod launched by KubernetesExecutor or KubernetesPodOperator. 虽然我只能让它与由 KubernetesPodOperator 而不是 KubernetesExecutor 创建的 Pod 一起工作
      • 绝对不是无用的设置,需要时查看我的回答
      【解决方案3】:

      您是否在airflow.cfg 字段中设置“airflow_local_settings_configmap = airflow-configmap”?

      【讨论】:

        猜你喜欢
        • 2021-04-19
        • 1970-01-01
        • 2020-12-13
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-05-08
        相关资源
        最近更新 更多