【问题标题】:Accessing celery worker instance inside the task在任务中访问 c​​elery worker 实例
【发布时间】:2017-08-07 15:16:06
【问题描述】:

我想在 celery worker 中使用 jupyter 内核。每个 Celery Worker 将有一个 Jupyter 内核。

为了实现它,我将覆盖默认的 Worker 芹菜类,在初始化工作程序时,我正在启动 jupyter 内核,并使用 stop 方法关闭 jupyter 内核。

我目前面临的问题是如何在任务运行时访问任务内的内核实例?

有没有比app.Worker = CustomWorker 覆盖celery 应用程序的Worker 类定义更好的方法?

这是带有自定义工作器的 celery 配置。

from __future__ import absolute_import, unicode_literals
from celery import Celery
from jupyter_client import MultiKernelManager

app = Celery('proj',
    broker='redis://',
    backend='redis://',
    include=['tasks'])

app.conf.update(
    result_expires=3600
)

class CustomWorker(app.Worker):
    def __init__(self, *args, **kwargs):
        self.km = MultiKernelManager()
        self.kernel_id = self.km.start_kernel()
        print("Custom initializing")
        self.kernel_client = km.get_kernel(kernel_id).client()
        super(CustomWorker, self).__init__(*args, **kwargs)

    def on_close(self):
        self.km.shutdown_kernel(self.kernel_id)
        super(CustomWorker, self).on_close()

app.Worker = CustomWorker

if __name__ == '__main__':
    app.start()

这是tasks.py的骨架

from __future__ import absolute_import, unicode_literals
from celery import app

from celery import Task
from tornado import gen
from jupyter_client import MultiKernelManager
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()

reply_futures = {}

# This is my celery task where I pass the arbitary python code to execute on
# some celery worker(actually to the corresponding kernel)
@app.task
def pythontask(code):
    # I don't know how to get the kernel_client for current celery worker !!?
    kernel_client = self.get_current_worker().kernel_client
    mid = kernel_client.execute(code)

    # defining the callback which will be executed when message arrives on
    # zmq stream
    def reply_callback(session, stream, msg_list):
        idents, msg_parts = session.feed_identities(msg_list)
        reply = session.deserialize(msg_parts)
        parent_id = reply['parent_header'].get('msg_id')
        reply_future = reply_futures.get(parent_id)
        if reply_future:
            reply_future.set_result(reply)

    @gen.coroutine
    def execute(kernel_client, code):
        msg_id = kernel_client.execute(code)
        f = reply_futures[msg_id] = Future()
        yield f
        raise gen.Return(msg_id)

    # initializing the zmq streams and attaching the callback to receive message
    # from the kernel
    shell_stream = ZMQStream(kernel_client.shell_channel.socket)
    iopub_stream = ZMQStream(kernel_client.iopub_channel.socket)
    shell_stream.on_recv_stream(partial(reply_callback, kernel_client.session))
    iopub_stream.on_recv_stream(partial(reply_callback, kernel_client.session))

    # create a IOLoop
    loop = ioloop.IOLoop.current()
    # listen on the streams
    msg_id = loop.run_sync(lambda: execute(kernel_client,code))
    print(reply_msgs[msg_id])
    reply_msgs[msg_id] = []

    # Disable callback and automatic receiving.
    shell_stream.on_recv_stream(None)
    iopub_stream.on_recv_stream(None)

【问题讨论】:

    标签: python celery jupyter pyzmq


    【解决方案1】:

    将该工作实例信息添加到请求对象解决了我的问题。为此,我覆盖了 worker 类的 _process_task 方法。

    def _process_task(self, req):
      try:
        req.kwargs['kernel_client'] = self.kernel_client
        print("printing from _process_task {}".format(req.kwargs))
        req.execute_using_pool(self.pool)
      except TaskRevokedError:
        try:
          self._quick_release()   # Issue 877
        except AttributeError:
          pass
      except Exception as exc:
        logger.critical('Internal error: %r\n%s',exc, traceback.format_exc(), exc_info=True)
    

    这是我访问kernel_client的任务

    @app.task(bind=True)
    def pythontask(self,code, kernel_client=None):
    
        mid = kernel_client.execute(code)
    
        print("{}".format(kernel_client))
        print("{}".format(mid))
    

    这个东西只有在我以单人模式启动工人时才有效,否则它不会引发一些酸洗错误。无论如何使用单独的工人是我的要求,所以这个解决方案适合我

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2023-03-07
    • 2012-01-27
    • 2011-09-16
    • 2021-10-31
    • 2019-11-25
    • 2019-10-23
    • 1970-01-01
    • 2017-06-14
    相关资源
    最近更新 更多