【问题标题】:airflow command error: argument GROUP_OR_COMMAND: celery subcommand works only with CeleryExecutor气流命令错误:参数 GROUP_OR_COMMAND:celery 子命令仅适用于 CeleryExecutor
【发布时间】:2022-11-06 02:36:29
【问题描述】:

我正在尝试为我的项目设置气流集群,并且我正在使用 celery 执行器作为执行器。除此之外,我使用 Rabbitmq 作为队列服务,使用 postgresql 作为数据库。现在我有两个主节点和两个工作节点。所有服务都已启动并运行,我能够使用气流网络服务器和调度程序配置我的主节点。但是对于我的工作节点,我遇到了一个错误:

气流命令错误:参数 GROUP_OR_COMMAND:celery 子命令仅适用于 CeleryExecutor、CeleryKubernetesExecutor 和从它们派生的执行器,您当前的执行器:SequentialExecutor,子类来自:BaseExecutor,请参阅上面的帮助。

我确实正确配置了airflow.cfg。我确实将 executor 值设置为 CeleryExecutor (这不意味着我已经设置了 executor 值)。

我的airflow.cfg如下: 注意:我只是添加了我认为与问题相关的部分配置。

[celery]

# This section only applies if you are using the CeleryExecutor in
# ``[core]`` section above
# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

# The concurrency that will be used when starting workers with the
# ``airflow celery worker`` command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
worker_concurrency = 16

# The maximum and minimum concurrency that will be used when starting workers with the
# ``airflow celery worker`` command (always keep minimum processes, but grow
# to maximum if necessary). Note the value should be max_concurrency,min_concurrency
# Pick these numbers based on resources on worker box and the nature of the task.
# If autoscale option is available, worker_concurrency will be ignored.
# http://docs.celeryproject.org/en/latest/reference/celery.bin.worker.html#cmdoption-celery-worker-autoscale
# Example: worker_autoscale = 16,12
# worker_autoscale =

# Used to increase the number of tasks that a worker prefetches which can improve performance.
# The number of processes multiplied by worker_prefetch_multiplier is the number of tasks
# that are prefetched by a worker. A value greater than 1 can result in tasks being unnecessarily
# blocked if there are multiple workers and one worker prefetches tasks that sit behind long
# running tasks while another worker has unutilized processes that are unable to process the already
# claimed blocked tasks.
# https://docs.celeryproject.org/en/stable/userguide/optimizing.html#prefetch-limits
worker_prefetch_multiplier = 1

# Specify if remote control of the workers is enabled.
# When using Amazon SQS as the broker, Celery creates lots of ``.*reply-celery-pidbox`` queues. You can
# prevent this by setting this to false. However, with this disabled Flower won't work.
worker_enable_remote_control = true

# Umask that will be used when starting workers with the ``airflow celery worker``
# in daemon mode. This control the file-creation mode mask which determines the initial
# value of file permission bits for newly created files.
worker_umask = 0o077

# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more information.
broker_url = amqp://admin:password@{hostname}:5672/

# The Celery result_backend. When a job finishes, it needs to update the
# metadata of the job. Therefore it will post a message on a message bus,
# or insert it into a database (depending of the backend)
# This status is used by the scheduler to update the state of the task
# The use of a database is highly recommended
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
result_backend = db+postgresql://postgres:airflow@postgres/airflow
# The executor class that airflow should use. Choices include
# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,
# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the
# full import path to the class when using a custom executor.
executor = CeleryExecutor

如果我没有添加与我的问题相关的足够信息,请告诉我。谢谢你。

【问题讨论】:

    标签: airflow


    【解决方案1】:

    上述错误的原因可能是:-

    Airflow 正在选择位于airflow.cfg 核心部分的执行器的默认值(即SequentialExecutor)。 This is the template for Airflow's default configuration. When Airflow is imported, it looks for a configuration file at $AIRFLOW_HOME/airflow.cfg. If it doesn't exist, Airflow uses this template

    如果您使用的是official helm chart,则以下解决方案适用:-

    1. 在airflow.cfg 的核心部分更改执行器的默认值。 Snapshot of default configuration

    2. 在花部署/容器中传递名为 AIRFLOW_HOME 的环境变量。您可以通过在 helm 图表的值文件中传递以下内容来简单地在所有容器中传递环境变量:

      env:
        - name: "AIRFLOW_HOME"
          value: "/path/to/airflow/home"
      
      1. 如果气流用户无权访问您在环境变量 AIRFLOW_HOME 中传递的路径,请以 root 用户身份运行花容器,这可以通过在 helm chat 的值文件中传递以下配置来完成。
      flower:
        enabled: true
        securityContext:
          runAsUser: 0
      

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-03-26
      • 2021-10-30
      • 1970-01-01
      • 2019-09-10
      • 1970-01-01
      • 1970-01-01
      • 2017-06-26
      • 2016-03-31
      相关资源
      最近更新 更多