【发布时间】:2018-06-22 07:42:49
【问题描述】:
我是气流新手,现在我发现气流正在使用 celery 来安排其任务。要运行气流,我需要运行将启动 celery 的命令“airflow worker”。但是,这里总是有一个错误。由于我在互联网上搜索过,大多数问题都发生在用户自己编写的 celery.py 上。我只是通过启动气流来使用芹菜。所以有点不同。 任何人都可以帮助我吗?下面是bug的截图。
airflow@linux-test:~$ airflow worker
[2018-06-22 07:29:04,068] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-06-22 07:29:04,125] {driver.py:124} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2018-06-22 07:29:04,146] {driver.py:124} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
-------------- celery@linux-test v4.2.0 (windowlicker)
---- **** -----
--- * *** * -- Linux-4.15.0-22-generic-x86_64-with-Ubuntu-18.04-bionic 2018-06-22 07:29:04
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: airflow.executors.celery_executor:0x7f2267122310
- ** ---------- .> transport: amqp://airflow:**@localhost:5672/airflow
- ** ---------- .> results: postgresql://airflow:**@localhost:5432/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> default exchange=default(direct) key=default
[2018-06-22 07:29:04,630] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-06-22 07:29:04,689] {driver.py:124} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2018-06-22 07:29:04,715] {driver.py:124} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
Starting flask
[2018-06-22 07:29:04,858] {_internal.py:88} INFO - * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)
[2018-06-22 07:29:06,122: ERROR/ForkPoolWorker-1] Pool process <celery.concurrency.asynpool.Worker object at 0x7f22648c8e10> error: TypeError("Required argument 'object' (pos 1) not found",)
Traceback (most recent call last):
File "/home/airflow/.local/lib/python2.7/site-packages/billiard/pool.py", line 289, in __call__
sys.exit(self.workloop(pid=pid))
File "/home/airflow/.local/lib/python2.7/site-packages/billiard/pool.py", line 347, in workloop
req = wait_for_job()
File "/home/airflow/.local/lib/python2.7/site-packages/billiard/pool.py", line 447, in receive
ready, req = _receive(1.0)
File "/home/airflow/.local/lib/python2.7/site-packages/billiard/pool.py", line 419, in _recv
return True, loads(get_payload())
File "/home/airflow/.local/lib/python2.7/site-packages/billiard/common.py", line 107, in pickle_loads
return load(BytesIO(s))
TypeError: Required argument 'object' (pos 1) not found
[2018-06-22 07:29:06,127: ERROR/MainProcess] Process 'ForkPoolWorker-1' pid:18839 exited with 'exitcode 1'
【问题讨论】:
-
您可以长期使用 LocalExecutor。我发现与 Celery 合作并不值得麻烦。我们使用 LocalExecutor 在 16 CPU 服务器上运行气流,它像冠军 24/7 一样处理工作。您的错误非常模糊,我认为这里的任何人都无法提供帮助。试试 Airflow 聊天室:gitter.im/apache/incubator-airflow
-
PS:WePay,使用 Airflow 并在 Airflow 上发布了大量情报,建议“除非必须,否则不要使用 CeleryExecutor”...wecode.wepay.com/posts/airflow-wepay