【问题标题】:Cannot install additional requirements to apache airflow无法对 apache 气流安装附加要求
【发布时间】:2021-06-05 15:53:00
【问题描述】:

我正在使用以下 docker-compose 图像,此图像来自:https://github.com/apache/airflow/blob/main/docs/apache-airflow/start/docker-compose.yaml

version: "3"
x-airflow-common: &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.0-python3.7}
  environment: &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ""
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
    AIRFLOW__CORE__LOAD_EXAMPLES: "false"
    AIRFLOW__API__AUTH_BACKEND: "airflow.api.auth.backend.basic_auth"
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  depends_on: &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    ports:
      - 6379:6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test:
        [
          "CMD-SHELL",
          'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"',
        ]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    command: version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: "true"
      _AIRFLOW_WWW_USER_CREATE: "true"
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  ######################################################
  # SPARK SERVICES
  ######################################################

  jupyterlab:
    image: andreper/jupyterlab:3.0.0-spark-3.0.0
    container_name: jupyterlab
    ports:
      - 8888:8888
      - 4040:4040
    volumes:
      - shared-workspace:/opt/workspace
  spark-master:
    image: andreper/spark-master:3.0.0
    container_name: spark-master
    ports:
      - 8081:8080
      - 7077:7077
    volumes:
      - shared-workspace:/opt/workspace
  spark-worker-1:
    image: andreper/spark-worker:3.0.0
    container_name: spark-worker-1
    environment:
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=512m
    ports:
      - 8082:8081
    volumes:
      - shared-workspace:/opt/workspace
    depends_on:
      - spark-master
  spark-worker-2:
    image: andreper/spark-worker:3.0.0
    container_name: spark-worker-2
    environment:
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=512m
    ports:
      - 8083:8081
    volumes:
      - shared-workspace:/opt/workspace
    depends_on:
      - spark-master

volumes:
  postgres-db-volume:
  shared-workspace:
    name: "jordi_airflow"
    driver: local
    driver_opts:
      type: "none"
      o: "bind"
      device: "/Users/jordicrespoguzman/Projects/custom_airflow_spark/spark_folder"

我正在尝试运行以下 DAG:

from airflow import DAG
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.filesystem import FileSensor
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.email import EmailOperator

from datetime import datetime, timedelta
import csv
import requests
import json

default_args = {
    "owner": "airflow",
    "email_on_failure": False,
    "email_on_retry": False,
    "email": "admin@localhost.com",
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}


def printar():
    print("success!")


with DAG(
    "forex_data_pipeline",
    start_date=datetime(2021, 1, 1),
    schedule_interval="@daily",
    default_args=default_args,
    catchup=False,
) as dag:

    downloading_rates = PythonOperator(task_id="test1", python_callable=printar)

    forex_processing = SparkSubmitOperator(
        task_id="spark1",
        application="/opt/airflow/dags/test.py",
        conn_id="spark_conn",
        verbose=False,
    )

    downloading_rates  >> forex_processing

但我在气流 ui 中看到了这个错误:

Broken DAG: [/opt/airflow/dags/dag_spark.py] Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/opt/airflow/dags/dag_spark.py", line 7, in <module>
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
ModuleNotFoundError: No module named 'airflow.providers.apache'

我已指定在 docker-compose 文件中安装附加要求:

_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}

我写错了吗?我应该如何指定要在气流中安装的附加要求?我可以通过 requirements.txt 吗?如果是这样,我如何指定路径?

【问题讨论】:

标签: python docker docker-compose airflow


【解决方案1】:

我用过:

_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- package1 package2 package3 }

# Example
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-apache-spark}

# or:
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-oracle apache-airflow-providers-microsoft-mssql}

(注意:加1个空格)

然后当airflow docker第一次运行Python时,它会自动pip安装需求包。

[参考]

  • Running Airflow in Docker — Airflow Documentation
  • _PIP_ADDITIONAL_REQUIREMENTS
    • 如果不为空,气流容器将尝试安装变量中指定的要求。例如:lxml==4.6.3 charset-normalizer==1.4.1。在 Airflow 映像 2.1.1 及更高版本中可用。

【讨论】:

    【解决方案2】:

    _PIP_ADDITIONAL_REQUIREMENTS 环境变量的支持尚未发布。它仅受 docker 映像的开发者/未发布版本支持。计划在 Airflow 2.1.1 中提供此功能。更多信息请见:Adding extra requirements for build and runtime of the PROD image.

    对于旧版本,您应该构建一个新图像并将此图像设置为docker-compose.yaml。为此,您需要执行几个步骤。

    1. 新建一个Dockerfile,内容如下:
      FROM apache/airflow:2.0.0
      RUN pip install --no-cache-dir apache-airflow-providers
      
    2. 构建新映像:
      docker build . --tag my-company-airflow:2.0.0
      
    3. docker-compose.yaml 文件中设置此图像:
      echo "AIRFLOW_IMAGE_NAME=my-company-airflow:2.0.0" >> .env
      

    有关详细信息,请参阅: Official guide about running Airflow in docker-compose environment

    我特别推荐这个片段,它描述了在安装新的 pip 包时需要做什么。

    ModuleNotFoundError: 没有名为“XYZ”的模块

    Docker Compose 文件使用最新的 Airflow 映像 (apache/airflow)。如需安装新的Python库或系统库,可以customize and extend it.

    我建议您查看有关building Docker Image 的指南。这解释了如何安装更复杂的依赖项。

    我还建议仅使用官方网站上的 Docker-compose 文件并用于特定版本。新版本的 Docker-compose 文件可能不适用于旧版本的 Airflow,因为我们一直在对这些文件进行许多改进,以提高稳定性、可靠性和用户体验。

    【讨论】:

    • 不应该是--no-cache-dir而不是Dockerfile中的--no-cache-user吗?
    猜你喜欢
    • 2017-06-18
    • 1970-01-01
    • 1970-01-01
    • 2020-05-23
    • 2017-07-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-31
    相关资源
    最近更新 更多