【问题标题】:Unable to run airflow scheduler无法运行气流调度程序
【发布时间】:2023-04-04 03:01:01
【问题描述】:

我最近使用 this guide 为 ubuntu 16.04 在 AWS 服务器上安装了气流。在痛苦而成功的安装之后启动了网络服务器。我尝试了如下示例 dag

from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import timedelta
from airflow import DAG
import airflow


# DEFAULT ARGS
default_args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'depends_on_past': False}


dag = DAG('init_run', default_args=default_args, description='DAG SAMPLE',
schedule_interval='@daily')


def print_something():
        print("HELLO AIRFLOW!")


with dag:
        task_1 = PythonOperator(task_id='do_it', python_callable=print_something)
        task_2 = DummyOperator(task_id='dummy')

        task_1 << task_2

但是当我打开 UI 时,无论我手动触发多少次或刷新页面,dag 中的任务仍处于“无状态”。 p>

后来我发现气流调度程序没有运行并显示以下错误:

{celery_executor.py:228} ERROR - Error sending Celery task:No module named 'MySQLdb'
Celery Task ID: ('init_run', 'dummy', datetime.datetime(2019, 5, 30, 18, 0, 24, 902499, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1)
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 118, in send_task_to_executor
    result = task.apply_async(args=[command], queue=queue)
  File "/usr/local/lib/python3.7/site-packages/celery/app/task.py", line 535, in apply_async
    **options
  File "/usr/local/lib/python3.7/site-packages/celery/app/base.py", line 728, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "/usr/local/lib/python3.7/site-packages/celery/app/amqp.py", line 552, in send_task_message
    **properties
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 510, in _ensured
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 194, in _publish
    [maybe_declare(entity) for entity in declare]
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 194, in <listcomp>
    [maybe_declare(entity) for entity in declare]
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 102, in maybe_declare
    return maybe_declare(entity, self.channel, retry, **retry_policy)
  File "/usr/local/lib/python3.7/site-packages/kombu/common.py", line 121, in maybe_declare
    return _maybe_declare(entity, channel)
  File "/usr/local/lib/python3.7/site-packages/kombu/common.py", line 145, in _maybe_declare
    entity.declare(channel=channel)
  File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 608, in declare
    self._create_queue(nowait=nowait, channel=channel)
  File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 617, in _create_queue
    self.queue_declare(nowait=nowait, passive=False, channel=channel)
  File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 652, in queue_declare
    nowait=nowait,
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 531, in queue_declare
    self._new_queue(queue, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 82, in _new_queue
    self._get_or_create(queue)
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 70, in _get_or_create
    obj = self.session.query(self.queue_cls) \
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 65, in session
    _, Session = self._open()
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 56, in _open
    engine = self._engine_from_config()
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 51, in _engine_from_config
    return create_engine(conninfo.hostname, **transport_options)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/__init__.py", line 443, in create_engine
    return strategy.create(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/strategies.py", line 87, in create
    dbapi = dialect_cls.dbapi(**dbapi_args)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 104, in dbapi
    return __import__("MySQLdb")
ModuleNotFoundError: No module named 'MySQLdb'

这是配置文件(airflow.cfg)中的设置:

sql_alchemy_conn = postgresql+psycopg2://airflow@localhost:5432/airflow
broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
result_backend =  db+postgresql://airflow:airflow@localhost/airflow

我这两天一直在纠结这个问题,请帮忙

【问题讨论】:

    标签: postgresql amazon-ec2 ubuntu-16.04 airflow airflow-scheduler


    【解决方案1】:

    在您的airflow.cfg 中,还应该有一个celery_result_backend 的配置选项。你能告诉我们这个值是多少吗?如果它不在您的配置中,请将其设置为与 result_backend 相同的值

    即:

    celery_result_backend =  db+postgresql://airflow:airflow@localhost/airflow
    

    然后重新启动气流堆栈以确保应用配置更改。

    (我想将此作为评论,但没有足够的代表这样做)

    【讨论】:

    • 我用 result_backend 替换了 celery_result_backend...你建议我需要同时拥有它们吗?
    • 更改有什么不同吗?或者你还有同样的问题吗?您应该只需要一个,具体取决于您使用的是哪个版本的 Celery (more info here)。我不认为在配置中同时使用两者有任何负面影响,但最好使用与您正在使用的 celery 版本相关的一个。
    • 我今天会试试这个 n 让你知道
    • 是的,这对我有用。据我所知应该被接受。
    【解决方案2】:

    我认为您所遵循的示例并没有告诉您安装 mysql,而且您似乎在代理 URL 中使用它。

    您可以安装 mysql 并对其进行配置。 (python 3.5+)

    pip install mysqlclient
    

    或者,用于快速修复。您还可以使用 rabbit MQ(Rabbitmq 是一个消息代理,您需要使用 celery 重新运行气流 dags)来宾用户登录

    然后你的 broker_url 将是

    broker_url = amqp://guest:guest@localhost:5672//
    

    如果尚未安装,可以使用以下命令安装 Rabbitmq。

    sudo apt install rabbitmq-server
    

    在位于的配置文件中更改配置 NODE_IP_ADDRESS=0.0.0.0

    /etc/rabbitmq/rabbitmq-env.conf
    

    启动 RabbitMQ 服务

    sudo service rabbitmq-server start
    

    【讨论】:

    • 您提到的代理 url 是用于 RabbitMQ 的吗?如果是的话,你能提供类似的芹菜网址吗?因为我觉得 celery 仍在尝试访问 mysql,即使我已经指定了 Postgres 的所有路径
    • 这实际上是用于芹菜(rabbitmsq的),你在更改sql_alchemy_conn = postgresql+psycopg2://airflow@localhost:5432/airflow之后做了airflow initdbairflow upgradedb吗?
    • 对不起,我的 AB 我正忙于其他问题,是的,我做了气流 initdb
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-05-24
    • 1970-01-01
    • 1970-01-01
    • 2019-01-20
    • 1970-01-01
    • 2021-10-14
    相关资源
    最近更新 更多