【发布时间】:2020-08-06 13:45:15
【问题描述】:
当尝试使用KubernetesExecutor 运行 dag 时,worker pod 在启动后立即终止:
我有一个问题,为什么调度程序将 LocalExecutor 作为可在 pod describe result 中找到的环境变量发送这是正确的行为吗?
请找到所有必需的文件:
- airflow.cfg
- 工人 dag 描述
- 工人 dag 日志
- dag 文件
Worker pod 描述结果:
Name: tutorialv01printhello-50d3b9099ea64c19a51e2fb035eef8ac
Namespace: default
Priority: 0
Node: worker01/<node-ip>
Start Time: <date-time>
Labels: airflow-worker=<airflow-dummy>
airflow_version=1.10.11
dag_id=tutorial_v01
execution_date=
kubernetes_executor=True
task_id=print_hello
try_number=1
Annotations: <none>
Status: Failed
IP: <Node Ip>
IPs:
IP: <Node Ip>
Containers:
base:
Container ID: <container-id>
Image: <repo-name>/k8-airflow:latest
Image ID: docker-pullable://<repo-name>/k8-
Port: <none>
Host Port: <none>
Command:
airflow
run
tutorial_v01
print_hello
<date time>
--local
--pool
default_pool
-sd
/usr/local/airflow/dags/tutorial_01.py
State: Terminated
Reason: Error
Exit Code: 1
Started: Thu, 06 Aug 2020 13:20:21 +0000
Finished: Thu, 06 Aug 2020 13:20:22 +0000
Ready: False
Restart Count: 0
Environment Variables from:
airflow-configmap ConfigMap Optional: false
Environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__DAGS_FOLDER: /usr/local/airflow/dags/repo/
AIRFLOW__CORE__SQL_ALCHEMY_CONN: <alchemy-postgres-conn-url>
Mounts:
/usr/local/airflow/dags from airflow-dags (ro)
/usr/local/airflow/logs from airflow-logs (rw)
/var/run/secrets/kubernetes.io/serviceaccount from default-token-sdfdfdd (ro)
Conditions:
Type Status
Initialized True
Ready False
ContainersReady False
PodScheduled True
Volumes:
airflow-dags:
Type: PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
ClaimName: airflow-dags
ReadOnly: false
airflow-logs:
Type: EmptyDir (a temporary directory that shares a pod's lifetime)
Medium:
SizeLimit: <unset>
default-token-mnh2t:
Type: Secret (a volume populated by a Secret)
SecretName: default-token-mnh2t
Optional: false
QoS Class: BestEffort
Node-Selectors: <none>
Tolerations: node.kubernetes.io/not-ready:NoExecute for 300s
node.kubernetes.io/unreachable:NoExecute for 300s
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled <unknown> default-scheduler Successfully assigned default/tutorialv01printhello-50d3b9099ea64c19a51e2fb035eef8ac to worker01
Normal Pulling 8m4s kubelet, worker01 Pulling image "<repo-name>/k8-airflow:latest"
Normal Pulled 8m1s kubelet, worker01 Successfully pulled image "<repo-name>/k8-airflow:latest"
Normal Created 8m1s kubelet, worker01 Created container base
Normal Started 8m1s kubelet, worker01 Started container base
Worker pod 日志:
File "/usr/bin/airflow", line 25, in <module>
from airflow.configuration import conf
File "/usr/lib/python3.6/site-packages/airflow/__init__.py", line 31, in <module>
from airflow.utils.log.logging_mixin import LoggingMixin
File "/usr/lib/python3.6/site-packages/airflow/utils/__init__.py", line 24, in <module>
from .decorators import apply_defaults as _apply_defaults
File "/usr/lib/python3.6/site-packages/airflow/utils/decorators.py", line 36, in <module>
from airflow import settings
File "/usr/lib/python3.6/site-packages/airflow/settings.py", line 37, in <module>
from airflow.configuration import conf, AIRFLOW_HOME, WEBSERVER_CONFIG # NOQA F401
File "/usr/lib/python3.6/site-packages/airflow/configuration.py", line 636, in <module>
with open(TEST_CONFIG_FILE, 'w') as f:
PermissionError: [Errno 13] Permission denied: '/usr/local/airflow/unittests.cfg'
找到气流.cfg:
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-configmap
labels:
env: airflow-test
data:
airflow.cfg: |
[core]
dags_folder = /usr/local/airflow/dags
base_log_folder = /usr/local/airflow/logs
logging_level = INFO
executor = KubernetesExecutor
parallelism = 32
load_examples = False
plugins_folder = /usr/local/airflow/plugins
sql_alchemy_conn = postgresql+psycopg2://<username>:<pwd>@airflow-metastore:5432/airflow
[celery]
broker_url =
result_backend =
[webserver]
base_url = http://0.0.0.0:8080
rbac=False
web_server_host = 0.0.0.0
web_server_port = 8080
dag_default_view = tree
[kubernetes]
namespace = default
airflow_configmap =
worker_service_account_name = default
worker_container_image_pull_policy = Always
worker_dags_folder = /usr/local/airflow/dags
worker_container_repository = <repo-name>/k8-airflow
worker_container_tag = latest
delete_worker_pods = false
env_from_configmap_ref = airflow-configmap
git_repo = https://github.com/<repo-name>/airflow-dags
git_branch = master
git_sync_credentials_secret = git-credentials
git_sync_root = /tmp/git
git_dags_folder_mount_point = /usr/local/airflow/dags
git_sync_container_repository = <repo-name>/git-sync
git_sync_container_tag = latest
git_sync_init_container_name = git-sync-clone
dags_volume_claim = airflow-dags
in_cluster = True
dags_volume_subpath =
dags_volume_mount_point =
[kubernetes_environment_variables]
AIRFLOW__CORE__EXECUTOR = KubernetesExecutor
AIRFLOW__CORE__DAGS_FOLDER = /usr/local/airflow/dags
[admin]
hide_sensitive_variable_fields = True
还有 Kubernetes 文件:
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: default
namespace: default
labels:
env: airflow-test
rules:
- apiGroups: [""] # "" indicates the core API group
resources: ["pods"]
verbs: ["get", "list", "watch", "create", "update", "delete"]
- apiGroups: ["batch", "extensions"]
resources: ["jobs"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: default
namespace: default
labels:
env: airflow-test
subjects:
- kind: ServiceAccount
name: default # Name of the ServiceAccount
namespace: default
roleRef:
kind: Role # This must be Role or ClusterRole
name: default # This must match the name of the Role or ClusterRole you wish to bind to
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow
namespace: default
labels:
env: airflow-test
spec:
replicas: 1
selector:
matchLabels:
env: airflow-test
template:
metadata:
labels:
env: airflow-test
spec:
initContainers:
- name: "init"
image: <repo-name>/k8-airflow
imagePullPolicy: Always
volumeMounts:
- name: airflow-configmap
mountPath: /usr/local/airflow/airflow.cfg
subPath: airflow.cfg
- name: airflow-dags
mountPath: /usr/local/airflow/dags
env:
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
command:
- "bash"
args:
- "-cx"
- "initdb.sh"
containers:
- name: webserver
image: <repo-name>/k8-airflow
imagePullPolicy: IfNotPresent
env:
- name: NODE
value: "webserver"
envFrom:
- configMapRef:
name: airflow-configmap
ports:
- name: webserver
protocol: TCP
containerPort: 8080
volumeMounts:
- mountPath: /usr/local/airflow/dags
name: airflow-dags
- mountPath: /usr/local/airflow/airflow.cfg
name: airflow-configmap
subPath: airflow.cfg
- name: airflow-logs
mountPath: /usr/local/airflow/logs
- name: scheduler
image: <repo-name>/k8-airflow
imagePullPolicy: IfNotPresent
env:
- name: NODE
value: "scheduler"
envFrom:
- configMapRef:
name: airflow-configmap
ports:
- name: webserver
protocol: TCP
containerPort: 8080
volumeMounts:
- mountPath: /usr/local/airflow/dags
name: airflow-dags
- mountPath: /usr/local/airflow/airflow.cfg
name: airflow-configmap
subPath: airflow.cfg
- name: airflow-logs
mountPath: /usr/local/airflow/logs
volumes:
- name: airflow-configmap
configMap:
name: airflow-configmap
- name: airflow-dags
persistentVolumeClaim:
claimName: airflow-dags
- name: airflow-logs
persistentVolumeClaim:
claimName: airflow-logs
---
apiVersion: v1
kind: Service
metadata:
name: airflow
namespace: default
labels:
env: airflow-test
spec:
type: NodePort
ports:
- name: webserver
protocol: TCP
port: 8080
targetPort: 8080
nodePort: 30003
selector:
env: airflow-test
Dag 文件:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
def print_world():
print('world_1')
default_args = {
'start_date': dt.datetime(2020, 8, 6,9,45,0),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),
}
with DAG('tutorial_v01',
default_args=default_args,
schedule_interval='*/30 * * * *',
) as dag:
print_hello = BashOperator(task_id='print_hello',
bash_command='echo "hello"')
sleep = BashOperator(task_id='sleep',
bash_command='sleep 5')
print_world = PythonOperator(task_id='print_world',
python_callable=print_world)
print_hello >> sleep >> print_world
我已经在使用过的图片中为这个位置/usr/local/airflow/ 提供了 777,如果还有其他需要,请告诉我
【问题讨论】:
-
这是正确的行为吗? -> 是的,这是正确的。单个任务在 k8s 中作为 LocalExecutor 的一部分执行。我需要更多细节来回答这个问题。您的网络服务器和调度程序 pod 是否运行良好且没有任何问题?从 worker pod 日志看来,它无法访问 cfg 文件。我想知道的是你是如何安装这些卷的?任务如何查看代码以及他们如何编写日志?您是否为此使用任何 PV 或 PVC?
-
是的,我正在使用 PVC 和 PV 并安装到主机目录,如 airflow-dags = /home/user/airflow-dags 对于日志和我的调度程序和网络服务器工作正常。
-
ok 让我们调试一下。启动 DAG 后;您能否进入 CLI 中的工作 pod 并尝试找出它以谁身份运行?然后查看文件 /usr/local/airflow/unittests.cfg 是否存在?如果是的话,权限是什么,你可以用它做一些操作吗?
-
我正在尝试,但我不能;因为它在启动后立即终止
-
另外检查我认为这可能是你的问题。看起来您的 pod 是以什么用户身份运行的。 issues.apache.org/jira/browse/AIRFLOW-6754 。如果这回答了您的问题,请告诉我。
标签: docker kubernetes airflow