【问题标题】:Get context from Pod launched with Airflow KubernetesPodOperator从使用 Airflow KubernetesPodOperator 启动的 Pod 获取上下文
【发布时间】:2021-04-30 19:56:13
【问题描述】:

我们有一些使用 KubernetesPodOperator 启动 pod 的 dag,我正在尝试获取 pod 内部的一些信息,例如 dag_id、task_id、try_number、环境等。

我知道我可以从 Airflow 任务的上下文中获取这些信息(例如,Python Operator 上的 kwargs),但我一直在想,有没有一种方法可以从启动的 pod 中获取该上下文?

谢谢!

【问题讨论】:

  • 不能使用操作符将上下文信息作为配置映射或秘密或标签传递吗?

标签: python kubernetes airflow


【解决方案1】:

我找到了一个很好的解决方案

我为类 KubernetesPodOperator 制作了一个自定义包装器,并使用 Airflow 任务的上下文更新了 env_vars

import airflow.configuration as config
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator as AirflowKubernetesPodOperator

class KubernetesPodOperator(AirflowKubernetesPodOperator):

    def execute(self, context):

        environment = config.conf.get('webserver', 'web_server_name')

        ti = context['ti']
        dag_id = ti.dag_id
        task_id = ti.task_id
        run_id = context['run_id']
        try_number = str(ti._try_number)

        labels = { 
            'ENVIRONMENT' : environment,
            'DAG_ID'      : dag_id, 
            'TASK_ID'     : task_id, 
            'RUN_ID'      : run_id,
            'TRY_NUMBER'  : try_number,
        }

        self.env_vars.update(labels)
        super().execute(context)

【讨论】:

  • self.env_varsOptional[List[k8s.V1EnvVar]] = None 类型,所以 update 不能工作,因为它可能是 NoneList
  • 在气流 2.0 上 self._env_vars 是一个列表,但这适用于气流 1.X,因为 self.env_vars 是一个字典。几周前我写了一个函数来处理复古兼容性。
猜你喜欢
  • 1970-01-01
  • 2021-02-26
  • 2021-05-14
  • 2023-03-17
  • 1970-01-01
  • 2021-12-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多