【问题标题】:How can I cancel a hanging asyncronous task in tornado, with a timeout?如何在龙卷风中取消挂起的异步任务,并超时?
【发布时间】:2015-03-10 15:16:38
【问题描述】:

我的设置是 python tornado 服务器,它使用 ThreadPoolExecutor 异步处理任务。在某些情况下,任务可能会变成无限循环。使用with_timeout 装饰器,我设法捕获了超时异常并将错误结果返回给客户端。问题是任务仍在后台运行。如何阻止任务在ThreadPoolExecutor 中运行?或者是否可以取消Future? 这是重现问题的代码。使用 tornado 4 和 concurrent.futures 库运行代码并转到 http://localhost:8888/test

from tornado.concurrent import run_on_executor
from tornado.gen import with_timeout
from tornado.ioloop import IOLoop
import tornado.web
from tornado import gen
from concurrent.futures import ThreadPoolExecutor
import datetime
MAX_WAIT_SECONDS = 10

class MainHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(2)

    @run_on_executor
    def test_func(self):
        ...
        #infinite loop might be here
        ...

    @tornado.gen.coroutine
    def get(self):
        future = self.test_func()
        try:
            result_search_struct = yield with_timeout(datetime.timedelta(seconds=MAX_WAIT_SECONDS), future )
            self.write({'status' : 0})
            self.finish()
        except Exception, e:
            #how to cancel the task here if it was timeout
            future.cancel() # <-- Does not work
            self.write({'status' : 100})
            self.finish()

application = tornado.web.Application([
    (r"/test", MainHandler),
])
application.listen(8888)
IOLoop.instance().start()

【问题讨论】:

    标签: python asynchronous tornado concurrent.futures


    【解决方案1】:

    Future 实例本身一旦实际执行就不能被取消,只有在它们处于挂起状态时才能被取消。注意到in the docs

    取消()

    尝试取消通话。如果调用当前正在执行且无法取消,则该方法将返回False, 否则调用将被取消,该方法将返回True

    因此,中止您在后台运行的方法的唯一方法是将逻辑实际插入到您可能的无限循环中,以便在您告诉它时可以中止它。在您的示例中,您可以使用 threading.Event:

    class MainHandler(tornado.web.RequestHandler):
        executor = ThreadPoolExecutor(2)
    
        @run_on_executor
        def test_func(self, event):
            i = 0
            while not event.is_set():
                print i
                i = i + 1
    
        @tornado.gen.coroutine
        def get(self):
            event = threading.Event()
            future = self.test_func(event)
            try:
                result_search_struct = yield with_timeout(datetime.timedelta(seconds=MAX_WAIT_SECONDS), future )
                self.write({'status' : 0})
                self.finish()
            except Exception, e:
                future.cancel() # Might not work, depending on how busy the Executor is
                event.set()
                self.write({'status' : 100})
                self.finish()
    
    application = tornado.web.Application([
        (r"/test", MainHandler),
    ])
    

    【讨论】:

    • dano,谢谢回复,问题是没有真正的“循环”。有一个微妙的错误仍在调查中,它会导致任务内部出现无限循环。作为临时解决方案,我想取消挂起的线程 atm。我不确定如何在当前的代码结构中做到这一点。
    • 我已经修改了我的代码,所以它不会混淆有真正的循环
    • @EgorLakomkin 请参阅this question,了解如何在 Python 中杀死线程。有些事情你可以做,在某些情况下会起作用,但实际上并没有一种方法可以适用于所有用例。那里的大多数方法都特别有问题,因为您使用的是ThreadPoolExecutor,而不是直接使用Thread 对象。
    猜你喜欢
    • 2016-02-11
    • 1970-01-01
    • 2018-12-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多