【问题标题】:How to emit events from celery task如何从 celery 任务中发出事件
【发布时间】:2018-03-22 08:51:38
【问题描述】:

基本上,我试图在单独的线程中的服务器端生成事件。我有一个应该发出事件的celery.task,但它的代码永远不会执行。

import json
import time

from celery import Celery

from flask import Flask
from flask import jsonify
from flask import render_template
from flask import request

from flask_socketio import SocketIO

broker_url = "redis://localhost:6379/1"

celery = Celery(broker=broker_url)
app = Flask(__name__)
socketio = SocketIO(app, message_queue=broker_url)


@celery.task
def countdown(n):
    print("countdown", n)
    for i in range(n+1):
        time.sleep(1)
        socketio.emit(
            "countdown",
            {"remaining": n - i},
            namespace="/test/"
        )


@app.route("/")
def index():
    return render_template("index.html")


@app.route("/start_countdown/", methods=["POST"])
def start_countdown():
    data = json.loads(request.data.decode())
    countdown.delay([int(data["time"])])
    return jsonify(time_to_wait=data["time"])


if __name__ == '__main__':
    socketio.run(debug=True)

视图响应正常,但任务无声,我无法理解,为什么?

UPD

我重新排列了我的代码,例如 here。文件夹结构完全一样,文件也一样。在app/main 文件夹中,我有额外的tasks.py 文件。

import time

from celery import Celery
from flask_socketio import emit

from app import socketio
from config import broker_url


celery = Celery(broker=broker_url)

@celery.task
def countdown(n):
    print(n)

    for i in range(n+1):
        time.sleep(1)
        print("Socket", socketio)
        print("Server", socketio.server)
        socketio.emit(
            "countdown",
            {"remaining": n - i},
            namespace="/test/"
        )

我使用 celery -A app.main.tasks worker 命令启动 celery worker。当countdown 任务的代码被执行时,它会因为这个异常而失败:

[2017-10-12 19:04:07,797: WARNING/ForkPoolWorker-1] 13
[2017-10-12 19:04:08,799: WARNING/ForkPoolWorker-1] <flask_socketio.SocketIO object at 0x7f07d2a0fc50>
[2017-10-12 19:04:08,803: ERROR/ForkPoolWorker-1] Task app.main.tasks.countdown[68ae2e43-6ab7-4d52-8b3a-a9aaff46c489] raised unexpected: AttributeError("'NoneType' object has no attribute 'emit'",)
Traceback (most recent call last):
  File "/path/to/venv/lib/python3.4/site-packages/celery/app/trace.py", line 374, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/path/to/venv/lib/python3.4/site-packages/celery/app/trace.py", line 629, in __protected_call__
    return self.run(*args, **kwargs)
  File "/path/to/tasks.py", line 24, in countdown
    namespace="/test/"
  File "/path/to/venv/lib/python3.4/site-packages/flask_socketio/__init__.py", line 357, in emit
    self.server.emit(event, *args, namespace=namespace, room=room,
AttributeError: 'NoneType' object has no attribute 'emit'

我的任务中的socketio.server 出于某种原因是None,而在app/main/events.py 文件中它是适当的对象。看起来在我的任务中socketio 对象没有完全初始化,可能是因为在 celery 进程中执行流程不同,但我不知道如何修复它。

【问题讨论】:

  • 你是如何从客户端连接到这个应用程序的?
  • @Miguel,这里是 index.html 和 magic.js gist.github.com/montreal91/8a9ba9033df1469ccd51e1c2e7a3ac9f。如果需要,我可以将它们添加到帖子中。 start_countdown被调用,但后台任务没有启动。
  • 除了您的主服务器之外,是否至少运行一个 Celery 工作进程?
  • 现在,是的。
  • 那么任务是否正在运行?您可以将日志记录添加到 Celery worker 以查看它是否启动了任务。

标签: python flask celery flask-socketio


【解决方案1】:

我在使用 Flask-SocketIO 和 Celery 时遇到了类似的问题。

正如其他人提到的above,我们无法通过 Celery 任务与客户端通信,因为它们是两个不同的进程。

我的解决方案是在 Flask 服务器上创建一个端点,该端点基本上将事件广播到客户端。该端点将采用如下所示的 json 数据:

{
    "event_name": "Event name",
    "room": "roomId", // optional field 
    "data": "<data you want to send>" // this is the data you want to send to the users
   "broadcast_secret": <secret> // This would ensure ensure that we the endpoint is not public to everyone. If this secret is invalid then we would not broadcast anything to the client.
}

这是端点的样子:


@app.route('/braodcast',methods = ['POST'])
def broadcast_to_client(self):
    json_data = self.get_json()
    event_name = json_data.get('event_name')
    room = json_data.get('room')
    data = json_data.get('data')
    broadcast_secret = json_data.get('broadcast_secret')
    status_code, emitted = 400,  False
    if event_name and broadcast_secret == os.getenv('BROADCAST_SECRET'):
        socketio.emit(event_name, data, room=room)
        status_code, emitted = 200, True

    return {'emitted': emitted}, status_code

然后我创建了一个辅助函数,它使用 requests 模块从我的 celery 任务向该端点发出 post 请求:

import requests

def send_socket_event_from_celery(event_name, data=None, room=None):
    app_host = os.getenv('APP_HOST')
    broadcast_path = f'{app_host}/broadcast'

    json_data = {
        'event_name': event_name,
        'data': data,
        'room': room,
        'broadcast_secret': os.getenv('BROADCAST_SECRET')
    }

    requests.post(broadcast_path, json=json_data)

现在在任何 celery 任务中,我需要在套接字 IO 中发出一个事件,我只需调用此方法即可。

send_socket_event_from_celery('sample-event', data, room=some_room)

【讨论】:

    【解决方案2】:

    我只能让它与 Redis 作为代理一起工作,浪费了数小时尝试将 RabbitMQ 作为代理,并且无法从 celery 任务中发出消息。确保使用 Flask SocketIO 构造函数创建一个新实例,而不是主 Flask 应用程序的 socketio 实例。

    【讨论】:

    • 我重新设计了整个系统,当前版本根本不使用 celery 或 RabbitMQ。我开始怀疑这些是否是 python 进程之间通信的有用工具。我宁愿使用 RabbitMQ 让两个用两种不同语言编写的程序相互交流。
    • 如果你放弃 RabbitMQ ,至少最初只是为了让它工作,那么它非常简单,只需使用 Redis + Celery + SocketIO。
    • 我现在遇到了同样的问题,使用烧瓶 socketio 构造函数创建一个新实例并不能解决它。从日志中我看到我能够发送附加到 celery task_success 事件的消息,但客户端没有收到它。
    【解决方案3】:

    您的 celery 任务是否在单独的 tasks.py 模块中?例如:

    from celery import Celery
    
    celery = Celery('tasks', broker=broker_url)
    socketio = SocketIO(app, message_queue=broker_url)
    
    @celery.task
    def countdown(n):
        print("countdown", n)
        for i in range(n+1):
            time.sleep(1)
            socketio.emit(
                "countdown",
                {"remaining": n - i},
                namespace="/test/"
            )
    

    那么你就可以在提示符下启动这个worker了:

     celery -A tasks worker --loglevel=info
    

    然后你应该运行你的主代码并添加:

     from tasks import countdown
    

    并调用将在单独进程中运行的倒计时函数

    【讨论】:

      猜你喜欢
      • 2015-12-11
      • 1970-01-01
      • 2021-01-24
      • 1970-01-01
      • 2013-11-20
      • 2014-12-15
      • 2012-09-06
      • 1970-01-01
      • 2020-08-11
      相关资源
      最近更新 更多