【问题标题】:Airflow kubernetesExecutor : Worker pod terminate after creating气流 kubernetesExecutor :创建后 Worker pod 终止
【发布时间】:2020-08-06 13:45:15
【问题描述】:

当尝试使用KubernetesExecutor 运行 dag 时,worker pod 在启动后立即终止:

我有一个问题,为什么调度程序将 LocalExecutor 作为可在 pod describe result 中找到的环境变量发送这是正确的行为吗?

请找到所有必需的文件:

  1. airflow.cfg
  2. 工人 dag 描述
  3. 工人 dag 日志
  4. dag 文件

Worker pod 描述结果:

Name:         tutorialv01printhello-50d3b9099ea64c19a51e2fb035eef8ac
Namespace:    default
Priority:     0
Node:         worker01/<node-ip>
Start Time:   <date-time>
Labels:       airflow-worker=<airflow-dummy>
              airflow_version=1.10.11
              dag_id=tutorial_v01
              execution_date=
              kubernetes_executor=True
              task_id=print_hello
              try_number=1
Annotations:  <none>
Status:       Failed
IP:           <Node Ip>
IPs:
  IP:  <Node Ip>
Containers:
  base:
    Container ID:  <container-id>
    Image:         <repo-name>/k8-airflow:latest
    Image ID:      docker-pullable://<repo-name>/k8-
    Port:          <none>
    Host Port:     <none>
    Command:
      airflow
      run
      tutorial_v01
      print_hello
      <date time>
      --local
      --pool
      default_pool
      -sd
      /usr/local/airflow/dags/tutorial_01.py
    State:          Terminated
      Reason:       Error
      Exit Code:    1
      Started:      Thu, 06 Aug 2020 13:20:21 +0000
      Finished:     Thu, 06 Aug 2020 13:20:22 +0000
    Ready:          False
    Restart Count:  0
    Environment Variables from:
      airflow-configmap  ConfigMap  Optional: false
    Environment:
      AIRFLOW__CORE__EXECUTOR:          LocalExecutor
      AIRFLOW__CORE__DAGS_FOLDER:       /usr/local/airflow/dags/repo/
      AIRFLOW__CORE__SQL_ALCHEMY_CONN:  <alchemy-postgres-conn-url>
    Mounts:
      /usr/local/airflow/dags from airflow-dags (ro)
      /usr/local/airflow/logs from airflow-logs (rw)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-sdfdfdd (ro)
Conditions:
  Type              Status
  Initialized       True
  Ready             False
  ContainersReady   False
  PodScheduled      True
Volumes:
  airflow-dags:
    Type:       PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
    ClaimName:  airflow-dags
    ReadOnly:   false
  airflow-logs:
    Type:       EmptyDir (a temporary directory that shares a pod's lifetime)
    Medium:
    SizeLimit:  <unset>
  default-token-mnh2t:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  default-token-mnh2t
    Optional:    false
QoS Class:       BestEffort
Node-Selectors:  <none>
Tolerations:     node.kubernetes.io/not-ready:NoExecute for 300s
                 node.kubernetes.io/unreachable:NoExecute for 300s
Events:
  Type    Reason     Age        From               Message
  ----    ------     ----       ----               -------
  Normal  Scheduled  <unknown>  default-scheduler  Successfully assigned default/tutorialv01printhello-50d3b9099ea64c19a51e2fb035eef8ac to worker01
  Normal  Pulling    8m4s       kubelet, worker01  Pulling image "<repo-name>/k8-airflow:latest"
  Normal  Pulled     8m1s       kubelet, worker01  Successfully pulled image "<repo-name>/k8-airflow:latest"
  Normal  Created    8m1s       kubelet, worker01  Created container base
  Normal  Started    8m1s       kubelet, worker01  Started container base

Worker pod 日志:

  File "/usr/bin/airflow", line 25, in <module>
    from airflow.configuration import conf
  File "/usr/lib/python3.6/site-packages/airflow/__init__.py", line 31, in <module>
    from airflow.utils.log.logging_mixin import LoggingMixin
  File "/usr/lib/python3.6/site-packages/airflow/utils/__init__.py", line 24, in <module>
    from .decorators import apply_defaults as _apply_defaults
  File "/usr/lib/python3.6/site-packages/airflow/utils/decorators.py", line 36, in <module>
    from airflow import settings
  File "/usr/lib/python3.6/site-packages/airflow/settings.py", line 37, in <module>
    from airflow.configuration import conf, AIRFLOW_HOME, WEBSERVER_CONFIG  # NOQA F401
  File "/usr/lib/python3.6/site-packages/airflow/configuration.py", line 636, in <module>
    with open(TEST_CONFIG_FILE, 'w') as f:
PermissionError: [Errno 13] Permission denied: '/usr/local/airflow/unittests.cfg'

找到气流.cfg:

apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-configmap
  labels:
    env: airflow-test
data:
  airflow.cfg: |
    [core]
    dags_folder = /usr/local/airflow/dags
    base_log_folder = /usr/local/airflow/logs
    logging_level = INFO
    executor = KubernetesExecutor
    parallelism = 32
    load_examples = False
    plugins_folder = /usr/local/airflow/plugins
    sql_alchemy_conn = postgresql+psycopg2://<username>:<pwd>@airflow-metastore:5432/airflow

    [celery]
    broker_url =
    result_backend =

    [webserver]
    base_url = http://0.0.0.0:8080
    rbac=False
    web_server_host = 0.0.0.0
    web_server_port = 8080
    dag_default_view = tree

    [kubernetes]
    namespace = default
    airflow_configmap =
    worker_service_account_name = default
    worker_container_image_pull_policy = Always
    worker_dags_folder = /usr/local/airflow/dags
    worker_container_repository = <repo-name>/k8-airflow
    worker_container_tag = latest
    delete_worker_pods = false
    env_from_configmap_ref = airflow-configmap
    git_repo = https://github.com/<repo-name>/airflow-dags
    git_branch = master
    git_sync_credentials_secret = git-credentials
    git_sync_root = /tmp/git
    git_dags_folder_mount_point = /usr/local/airflow/dags
    git_sync_container_repository = <repo-name>/git-sync
    git_sync_container_tag = latest
    git_sync_init_container_name = git-sync-clone
    dags_volume_claim = airflow-dags
    in_cluster = True
    dags_volume_subpath =
    dags_volume_mount_point =


    [kubernetes_environment_variables]
    AIRFLOW__CORE__EXECUTOR = KubernetesExecutor
    AIRFLOW__CORE__DAGS_FOLDER = /usr/local/airflow/dags


    [admin]
    hide_sensitive_variable_fields = True

还有 Kubernetes 文件:

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: default
  namespace: default
  labels:
    env: airflow-test
rules:
  - apiGroups: [""] # "" indicates the core API group
    resources: ["pods"]
    verbs: ["get", "list", "watch", "create", "update", "delete"]
  - apiGroups: ["batch", "extensions"]
    resources: ["jobs"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: default
  namespace: default
  labels:
    env: airflow-test
subjects:
  - kind: ServiceAccount
    name: default # Name of the ServiceAccount
    namespace: default
roleRef:
  kind: Role # This must be Role or ClusterRole
  name: default # This must match the name of the Role or ClusterRole you wish to bind to
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: airflow
  namespace: default
  labels:
    env: airflow-test
spec:
  replicas: 1
  selector:
    matchLabels:
      env: airflow-test
  template:
    metadata:
      labels:
        env: airflow-test
    spec:
      initContainers:
        - name: "init"
          image: <repo-name>/k8-airflow
          imagePullPolicy: Always
          volumeMounts:
            - name: airflow-configmap
              mountPath: /usr/local/airflow/airflow.cfg
              subPath: airflow.cfg
            - name: airflow-dags
              mountPath: /usr/local/airflow/dags
          env:
            - name: SQL_ALCHEMY_CONN
              valueFrom:
                secretKeyRef:
                  name: airflow-secrets
                  key: sql_alchemy_conn
          command:
            - "bash"
          args:
            - "-cx"
            - "initdb.sh"

      containers:
        - name: webserver
          image: <repo-name>/k8-airflow
          imagePullPolicy: IfNotPresent
          env:
            - name: NODE
              value: "webserver"
          envFrom:
            - configMapRef:
                name: airflow-configmap
          ports:
            - name: webserver
              protocol: TCP
              containerPort: 8080
          volumeMounts:
            - mountPath: /usr/local/airflow/dags
              name: airflow-dags
            - mountPath: /usr/local/airflow/airflow.cfg
              name: airflow-configmap
              subPath: airflow.cfg
            - name: airflow-logs
              mountPath: /usr/local/airflow/logs
        - name: scheduler
          image: <repo-name>/k8-airflow
          imagePullPolicy: IfNotPresent
          env:
            - name: NODE
              value: "scheduler"
          envFrom:
            - configMapRef:
                name: airflow-configmap
          ports:
            - name: webserver
              protocol: TCP
              containerPort: 8080
          volumeMounts:
            - mountPath: /usr/local/airflow/dags
              name: airflow-dags
            - mountPath: /usr/local/airflow/airflow.cfg
              name: airflow-configmap
              subPath: airflow.cfg
            - name: airflow-logs
              mountPath: /usr/local/airflow/logs
      volumes:
        - name: airflow-configmap
          configMap:
            name: airflow-configmap
        - name: airflow-dags
          persistentVolumeClaim:
            claimName: airflow-dags
        - name: airflow-logs
          persistentVolumeClaim:
            claimName: airflow-logs

---

apiVersion: v1
kind: Service
metadata:
  name: airflow
  namespace: default
  labels:
    env: airflow-test
spec:
  type: NodePort
  ports:
    - name: webserver
      protocol: TCP
      port: 8080
      targetPort: 8080
      nodePort: 30003
  selector:
    env: airflow-test

Dag 文件:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

def print_world():
    print('world_1')

default_args = {
    'start_date': dt.datetime(2020, 8, 6,9,45,0),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
}

with DAG('tutorial_v01',
         default_args=default_args,
         schedule_interval='*/30 * * * *',
         ) as dag:
    print_hello = BashOperator(task_id='print_hello',
                               bash_command='echo "hello"')
    sleep = BashOperator(task_id='sleep',
                         bash_command='sleep 5')
    print_world = PythonOperator(task_id='print_world',
                                 python_callable=print_world)

print_hello >> sleep >> print_world 

我已经在使用过的图片中为这个位置/usr/local/airflow/ 提供了 777,如果还有其他需要,请告诉我

【问题讨论】:

  • 这是正确的行为吗? -> 是的,这是正确的。单个任务在 k8s 中作为 LocalExecutor 的一部分执行。我需要更多细节来回答这个问题。您的网络服务器和调度程序 pod 是否运行良好且没有任何问题?从 worker pod 日志看来,它无法访问 cfg 文件。我想知道的是你是如何安装这些卷的?任务如何查看代码以及他们如何编写日志?您是否为此使用任何 PV 或 PVC?
  • 是的,我正在使用 PVC 和 PV 并安装到主机目录,如 airflow-dags = /home/user/airflow-dags 对于日志和我的调度程序和网络服务器工作正常。
  • ok 让我们调试一下。启动 DAG 后;您能否进入 CLI 中的工作 pod 并尝试找出它以谁身份运行?然后查看文件 /usr/local/airflow/unittests.cfg 是否存在?如果是的话,权限是什么,你可以用它做一些操作吗?
  • 我正在尝试,但我不能;因为它在启动后立即终止
  • 另外检查我认为这可能是你的问题。看起来您的 pod 是以什么用户身份运行的。 issues.apache.org/jira/browse/AIRFLOW-6754 。如果这回答了您的问题,请告诉我。

标签: docker kubernetes airflow


【解决方案1】:

K8s 将气流作为 docker 容器运行。当您启动容器时,您需要以气流用户身份运行它。

这可以在您的 dockerfile 中实现。您可以指示它以用户身份运行。如果您想了解更多信息,请告诉我。

也适用于您的上述问题。请参考这里。

https://issues.apache.org/jira/browse/AIRFLOW-6754

希望这能回答您的问题。告诉我。

【讨论】:

    【解决方案2】:

    按照@hopeIsTheonlyWeapon 参考链接,使其工作..

    airflow.cfg中定义run_as_user = &lt;user uid&gt;

    或在配置映射中定义 ENV 变量 AIRFLOW__KUBERNETES__RUN_AS_USER=&lt;user uid&gt;

    例如, 如果您的AIRFLOW_HOME 路径可以访问airflow 用户而不是

    • 通过运行此命令id -u airflow识别用户uid
      • 现在在我的例子中,uid 是1000
    • 所以现在我的airflow.cfg 会有这个配置run_as_user = 1000

    参考:https://airflow.apache.org/docs/stable/configurations-ref.html#run-as-user

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-03-23
      • 2021-04-19
      • 2021-12-28
      • 2020-05-05
      • 1970-01-01
      • 2011-10-24
      • 2019-12-06
      • 2020-05-18
      相关资源
      最近更新 更多