【问题标题】:using celery in airflow在气流中使用芹菜
【发布时间】:2018-06-22 07:42:49
【问题描述】:

我是气流新手,现在我发现气流正在使用 celery 来安排其任务。要运行气流,我需要运行将启动 celery 的命令“airflow worker”。但是,这里总是有一个错误。由于我在互联网上搜索过,大多数问题都发生在用户自己编写的 celery.py 上。我只是通过启动气流来使用芹菜。所以有点不同。 任何人都可以帮助我吗?下面是bug的截图。

airflow@linux-test:~$ airflow worker
[2018-06-22 07:29:04,068] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-06-22 07:29:04,125] {driver.py:124} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2018-06-22 07:29:04,146] {driver.py:124} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
 -------------- celery@linux-test v4.2.0 (windowlicker)
---- **** -----
--- * ***  * -- Linux-4.15.0-22-generic-x86_64-with-Ubuntu-18.04-bionic 2018-06-22 07:29:04
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         airflow.executors.celery_executor:0x7f2267122310
- ** ---------- .> transport:   amqp://airflow:**@localhost:5672/airflow
- ** ---------- .> results:     postgresql://airflow:**@localhost:5432/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> default          exchange=default(direct) key=default
[2018-06-22 07:29:04,630] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-06-22 07:29:04,689] {driver.py:124} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2018-06-22 07:29:04,715] {driver.py:124} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
Starting flask
[2018-06-22 07:29:04,858] {_internal.py:88} INFO -  * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)
[2018-06-22 07:29:06,122: ERROR/ForkPoolWorker-1] Pool process <celery.concurrency.asynpool.Worker object at 0x7f22648c8e10> error: TypeError("Required argument 'object' (pos 1) not found",)
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python2.7/site-packages/billiard/pool.py", line 289, in __call__
    sys.exit(self.workloop(pid=pid))
  File "/home/airflow/.local/lib/python2.7/site-packages/billiard/pool.py", line 347, in workloop
    req = wait_for_job()
  File "/home/airflow/.local/lib/python2.7/site-packages/billiard/pool.py", line 447, in receive
    ready, req = _receive(1.0)
  File "/home/airflow/.local/lib/python2.7/site-packages/billiard/pool.py", line 419, in _recv
    return True, loads(get_payload())
  File "/home/airflow/.local/lib/python2.7/site-packages/billiard/common.py", line 107, in pickle_loads
    return load(BytesIO(s))
TypeError: Required argument 'object' (pos 1) not found
[2018-06-22 07:29:06,127: ERROR/MainProcess] Process 'ForkPoolWorker-1' pid:18839 exited with 'exitcode 1'

【问题讨论】:

  • 您可以长期使用 LocalExecutor。我发现与 Celery 合作并不值得麻烦。我们使用 LocalExecutor 在 16 CPU 服务器上运行气流,它像冠军 24/7 一样处理工作。您的错误非常模糊,我认为这里的任何人都无法提供帮助。试试 Airflow 聊天室:gitter.im/apache/incubator-airflow
  • PS:WePay,使用 Airflow 并在 Airflow 上发布了大量情报,建议“除非必须,否则不要使用 CeleryExecutor”...wecode.wepay.com/posts/airflow-wepay

标签: celery airflow


【解决方案1】:

卸载 librabbitmq 对我有用:pip uninstall librabbitmq。我不太明白为什么,但显然,对该库进行了一些优化,导致事情失败。这是我在某个网站上找到的the answer(我不得不翻译页面,因此我无法很好地理解解决方案) 希望对你有帮助

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-03
    • 1970-01-01
    • 2017-09-08
    • 2018-08-24
    • 2019-12-06
    相关资源
    最近更新 更多