【问题标题】:Writing a TTL decorator in Python用 Python 编写一个 TTL 装饰器
【发布时间】:2009-02-23 07:16:49
【问题描述】:

我正在尝试在 python 中编写一个 TTL 装饰器。 基本上,如果函数没有,我会给它一个异常 在选定的时间内回答。

你可以在http://sebulba.wikispaces.com/recipe+thread2找到thead2 sn-ps

from thread2 import Thread

"""  A TTL decorator. """
class Worker(Thread):
    def __init__(self, q, f, args, kvargs):
        Thread.__init__(self)

        self.q = q
        self.f = f
        self.args = args
        self.kvargs = kvargs

    def run(self,):
        try:
            res = (True, self.f(*self.args, **self.kvargs))
            self.q.put(res)
        except Exception, e:
            self.q.put((False, e))

class Referee(Thread):
    def __init__(self,q, ttl,exception_factory):
        Thread.__init__(self)

        self.exception_factory=exception_factory    
        self.q=q
        self.ttl=ttl

    def run(self):
        time.sleep(self.ttl)
        res = (False, self.exception_factory())
        self.q.put(res)

def raise_if_too_long(ttl, exception_factory=lambda :RuntimeError("Timeout")):
    def raise_if_too_long_aux(f):
        def ritl(*args,**kvargs):
            q = Queue.Queue(2)

            referee = Referee(q, ttl, exception_factory)
            worker = Worker(q,f,args,kvargs)

            worker.start()
            referee.start()

            (valid, res)= q.get(1)

            q.task_done()

            referee.terminate()
            worker.terminate()

            if valid:
                return res
            else:
                raise res

        return ritl

    return raise_if_too_long_aux

但是,我得到了一些非常糟糕的结果。 似乎有时该功能还可以正常返回 装饰器在达到 TTL 并且错误之前不会返回 加薪。

您发现这段代码有什么问题吗? 有没有一种通用的方法/库在 python 中编写带有 TTL 的函数?

【问题讨论】:

    标签: python decorator ttl


    【解决方案1】:

    提供的代码有点难以理解——它如何在正确的线程中正确的时间、正确的位置引发异常?

    考虑一下这个粗略的流程:

    装饰器函数与目标函数一起调用。返回一个函数:

    1. 启动线程,调用目标函数
    2. 使用 thread.join([timeout]) 加入线程
    3. 如果超时,引发异常,并忽略线程的结果。
    4. 如果没有超时,捕获线程的结果并返回。

    (您需要设计一种方法来捕获线程的输出......)

    有关线程超时的信息,请参阅http://docs.python.org/library/threading.html...

    (或者直接使用erlang :)

    【讨论】:

    • 谢谢它真的很有帮助。我不知道 thread.join(timeout)。
    【解决方案2】:

    如果您希望在超过超时后终止函数的执行,您可能需要尝试具有该功能的代码。要使用该模块,只需将您的函数作为add_timeout 的参数调用,并且返回值可以运行。一旦被调用,就可以轮询对象的ready 属性,并且可以通过value 属性访问返回的任何内容。代码的文档应该提供对其余可用 API 的解释。

    [因长度而编辑的代码]请参阅 timeout.py 获取源代码。


    附录:

    还有另一种看待问题的方法。您可以将函数及其参数提交给一个执行引擎,该引擎可以强制执行它可以运行多长时间的超时,而不是装饰一个函数以使其具有生存时间。我上面的原始答案是大约八年前对这个问题的解决方案。在进一步研究该主题后,我可以推荐以下模块吗?其后的模块用于测试和演示asynchronous 模块的用法。

    asynchronous.py

    #! /usr/bin/env python3
    import abc as _abc
    import collections as _collections
    import enum as _enum
    import math as _math
    import multiprocessing as _multiprocessing
    import operator as _operator
    import queue as _queue
    import signal as _signal
    import sys as _sys
    import time as _time
    
    __all__ = (
        'Executor',
        'get_timeout',
        'set_timeout',
        'submit',
        'map_',
        'shutdown'
    )
    
    
    class _Base(metaclass=_abc.ABCMeta):
        __slots__ = (
            '__timeout',
        )
    
        @_abc.abstractmethod
        def __init__(self, timeout):
            self.timeout = _math.inf if timeout is None else timeout
    
        def get_timeout(self):
            return self.__timeout
    
        def set_timeout(self, value):
            if not isinstance(value, (float, int)):
                raise TypeError('value must be of type float or int')
            if value <= 0:
                raise ValueError('value must be greater than zero')
            self.__timeout = value
    
        timeout = property(get_timeout, set_timeout)
    
    
    def _run_and_catch(fn, args, kwargs):
        # noinspection PyPep8,PyBroadException
        try:
            return False, fn(*args, **kwargs)
        except:
            return True, _sys.exc_info()[1]
    
    
    def _run(fn, args, kwargs, queue):
        queue.put_nowait(_run_and_catch(fn, args, kwargs))
    
    
    class _State(_enum.IntEnum):
        PENDING = _enum.auto()
        RUNNING = _enum.auto()
        CANCELLED = _enum.auto()
        FINISHED = _enum.auto()
        ERROR = _enum.auto()
    
    
    def _run_and_catch_loop(iterable, *args, **kwargs):
        exception = None
        for fn in iterable:
            error, value = _run_and_catch(fn, args, kwargs)
            if error:
                exception = value
        if exception:
            raise exception
    
    
    class _Future(_Base):
        __slots__ = (
            '__queue',
            '__process',
            '__start_time',
            '__callbacks',
            '__result'
        )
    
        def __init__(self, timeout, fn, args, kwargs):
            super().__init__(timeout)
            self.__queue = _multiprocessing.Queue(1)
            self.__process = _multiprocessing.Process(
                target=_run,
                args=(fn, args, kwargs, self.__queue),
                daemon=True
            )
            self.__start_time = _math.inf
            self.__callbacks = _collections.deque()
            self.__result = True, TimeoutError()
    
        @property
        def __state(self):
            pid, exitcode = self.__process.pid, self.__process.exitcode
            return (_State.PENDING if pid is None else
                    _State.RUNNING if exitcode is None else
                    _State.CANCELLED if exitcode == -_signal.SIGTERM else
                    _State.FINISHED if exitcode == 0 else
                    _State.ERROR)
    
        def __repr__(self):
            root = f'{type(self).__name__} at {id(self)} state={self.__state.name}'
            if self.__state < _State.CANCELLED:
                return f'<{root}>'
            error, value = self.__result
            suffix = f'{"raised" if error else "returned"} {type(value).__name__}'
            return f'<{root} {suffix}>'
    
        def __consume_callbacks(self):
            while self.__callbacks:
                yield self.__callbacks.popleft()
    
        def __invoke_callbacks(self):
            self.__process.join()
            _run_and_catch_loop(self.__consume_callbacks(), self)
    
        def cancel(self):
            self.__process.terminate()
            self.__invoke_callbacks()
    
        def __auto_cancel(self):
            elapsed_time = _time.perf_counter() - self.__start_time
            if elapsed_time > self.timeout:
                self.cancel()
            return elapsed_time
    
        def cancelled(self):
            self.__auto_cancel()
            return self.__state is _State.CANCELLED
    
        def running(self):
            self.__auto_cancel()
            return self.__state is _State.RUNNING
    
        def done(self):
            self.__auto_cancel()
            return self.__state > _State.RUNNING
    
        def __handle_result(self, error, value):
            self.__result = error, value
            self.__invoke_callbacks()
    
        def __ensure_termination(self):
            elapsed_time = self.__auto_cancel()
            if not self.__queue.empty():
                self.__handle_result(*self.__queue.get_nowait())
            elif self.__state < _State.CANCELLED:
                remaining_time = self.timeout - elapsed_time
                if remaining_time == _math.inf:
                    remaining_time = None
                try:
                    result = self.__queue.get(True, remaining_time)
                except _queue.Empty:
                    self.cancel()
                else:
                    self.__handle_result(*result)
    
        def result(self):
            self.__ensure_termination()
            error, value = self.__result
            if error:
                raise value
            return value
    
        def exception(self):
            self.__ensure_termination()
            error, value = self.__result
            if error:
                return value
    
        def add_done_callback(self, fn):
            if self.done():
                fn(self)
            else:
                self.__callbacks.append(fn)
    
        def _set_running_or_notify_cancel(self):
            if self.__state is _State.PENDING:
                self.__process.start()
                self.__start_time = _time.perf_counter()
            else:
                self.cancel()
    
    
    class Executor(_Base):
        __slots__ = (
            '__futures',
        )
    
        def __init__(self, timeout=None):
            super().__init__(timeout)
            self.__futures = set()
    
        def submit(self, fn, *args, **kwargs):
            future = _Future(self.timeout, fn, args, kwargs)
            self.__futures.add(future)
            future.add_done_callback(self.__futures.remove)
            # noinspection PyProtectedMember
            future._set_running_or_notify_cancel()
            return future
    
        @staticmethod
        def __cancel_futures(iterable):
            _run_and_catch_loop(map(_operator.attrgetter('cancel'), iterable))
    
        def map(self, fn, *iterables):
            futures = tuple(self.submit(fn, *args) for args in zip(*iterables))
    
            def result_iterator():
                future_iterator = iter(futures)
                try:
                    for future in future_iterator:
                        yield future.result()
                finally:
                    self.__cancel_futures(future_iterator)
    
            return result_iterator()
    
        def shutdown(self):
            self.__cancel_futures(frozenset(self.__futures))
    
        def __enter__(self):
            return self
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            self.shutdown()
            return False
    
    
    _executor = Executor()
    get_timeout = _executor.get_timeout
    set_timeout = _executor.set_timeout
    submit = _executor.submit
    map_ = _executor.map
    shutdown = _executor.shutdown
    del _executor
    

    test_asynchronous.py

    #! /usr/bin/env python3
    import _thread
    import atexit
    import functools
    import inspect
    import io
    import math
    import operator
    import os
    import sys
    import time
    import unittest
    
    import asynchronous
    
    
    # noinspection PyUnresolvedReferences
    class TestConstructor:
        def instantiate(self, *args):
            parameters = len(inspect.signature(self.CLASS).parameters)
            return self.CLASS(*args[:parameters])
    
        def test_valid_timeout(self):
            instance = self.instantiate(None, print, (), {})
            self.assertEqual(instance.get_timeout(), math.inf)
            instance = self.instantiate(1, print, (), {})
            self.assertEqual(instance.get_timeout(), 1)
            float_timeout = (math.e ** (1j * math.pi) + 1).imag
            self.assertIsInstance(float_timeout, float)
            instance = self.instantiate(float_timeout, print, (), {})
            self.assertEqual(instance.get_timeout(), float_timeout)
    
        def test_error_timeout(self):
            self.assertRaises(TypeError, self.instantiate, '60', print, (), {})
            self.assertRaises(ValueError, self.instantiate, 0, print, (), {})
            self.assertRaises(ValueError, self.instantiate, -1, print, (), {})
    
    
    # noinspection PyUnresolvedReferences
    class TestTimeout(TestConstructor):
        def test_valid_property(self):
            instance = self.instantiate(None, None, None, None)
            instance.timeout = 1
            self.assertIsInstance(instance.timeout, int)
            instance.timeout = 1 / 2
            self.assertIsInstance(instance.timeout, float)
            kilo_bit = int.from_bytes(os.urandom(1 << 7), 'big')
            instance.timeout = kilo_bit
            self.assertEqual(instance.timeout, kilo_bit)
    
        def test_error_property(self):
            instance = self.instantiate(None, None, None, None)
            for exception, value in (
                    (TypeError, 'inf'),
                    (TypeError, complex(123456789, 0)),
                    (ValueError, 0),
                    (ValueError, 0.0),
                    (ValueError, -1),
                    (ValueError, -math.pi)
            ):
                with self.assertRaises(exception):
                    instance.timeout = value
                self.assertEqual(instance.timeout, math.inf)
    
    
    class Timer:
        __timers = {}
    
        @classmethod
        def start_timer(cls):
            ident, now = _thread.get_ident(), time.perf_counter()
            if now is not cls.__timers.setdefault(ident, now):
                raise KeyError(ident)
    
        @classmethod
        def stop_timer(cls, expected_time, error=None):
            if error is None:
                error = 1 / 4  # the default is a quarter second
            used = time.perf_counter() - cls.__timers.pop(_thread.get_ident())
            diff = used - expected_time
            return -error <= diff <= +error
    
    
    # noinspection PyUnresolvedReferences
    class TestTimer(Timer):
        def stop_timer(self, expected_time, error=None):
            self.assertTrue(super().stop_timer(expected_time, error))
    
    
    def delay_run(delay, fn, *args, sync=True, **kwargs):
        def wrapper():
            time.sleep(delay)
            return fn(*args, **kwargs)
    
        if sync:
            return wrapper()
        _thread.start_new_thread(wrapper, ())
    
    
    # noinspection PyUnresolvedReferences
    class TestModuleOrInstance(TestTimer):
        @property
        def moi(self):
            return self.MODULE_OR_INSTANCE
    
        def test_valid_timeout(self):
            self.moi.set_timeout(math.inf)
            self.assertEqual(self.moi.get_timeout(), math.inf)
            self.moi.set_timeout(60)
            self.assertEqual(self.moi.get_timeout(), 60)
            self.moi.set_timeout(0.05)
            self.assertEqual(self.moi.get_timeout(), 0.05)
    
        def test_error_timeout(self):
            self.moi.set_timeout(math.inf)
            self.assertRaises(TypeError, self.moi.set_timeout, None)
            self.assertEqual(self.moi.get_timeout(), math.inf)
            self.assertRaises(ValueError, self.moi.set_timeout, 0)
            self.assertEqual(self.moi.get_timeout(), math.inf)
            self.assertRaises(ValueError, self.moi.set_timeout, -1)
            self.assertEqual(self.moi.get_timeout(), math.inf)
    
        def run_submit_check(self):
            self.start_timer()
            future = self.moi.submit(delay_run, 0.5, operator.add, 1, 2)
            self.assertRegex(repr(future), r'^<_Future at \d+ state=RUNNING>$')
            self.assertEqual(future.result(), 3)
            self.stop_timer(0.5)
            self.assertRegex(
                repr(future),
                r'^<_Future at \d+ state=FINISHED returned int>$'
            )
    
        def test_submit_one_second_timeout(self):
            self.moi.set_timeout(1)
            self.run_submit_check()
    
        def test_submit_no_timeout(self):
            self.moi.set_timeout(math.inf)
            self.run_submit_check()
    
        def test_submit_short_timeout(self):
            self.moi.set_timeout(0.5)
            self.start_timer()
            future = self.moi.submit(delay_run, 1, operator.add, 1, 2)
            self.assertRegex(repr(future), r'^<_Future at \d+ state=RUNNING>$')
            self.assertIsInstance(future.exception(), TimeoutError)
            self.stop_timer(0.5)
            self.assertRegex(
                repr(future),
                r'^<_Future at \d+ state=CANCELLED raised TimeoutError>$'
            )
    
        def run_map(self, *args):
            return getattr(self.moi, self.NAME_OF_MAP)(delay_run, *args)
    
        def test_valid_map(self):
            self.moi.set_timeout(1.5)
            for result in self.run_map(
                    [1, 1, 1, 1],
                    [operator.add] * 4,
                    [0, 1, 2, 3],
                    [3, 2, 1, 0]
            ):
                self.assertEqual(result, 3)
    
        def test_error_map(self):
            self.moi.set_timeout(1.5)
            success = 0
            with self.assertRaises(TimeoutError):
                for result in self.run_map(
                        [1, 1, 2, 1],
                        [operator.add] * 4,
                        [0, 1, 2, 3],
                        [3, 2, 1, 0]
                ):
                    self.assertEqual(result, 3)
                    success += 1
            self.assertEqual(success, 2)
    
        def run_shutdown_check(self, running, future):
            self.assertRaises(TimeoutError, future.result)
            running.remove(future)
    
        def run_submit_loop(self, executor):
            running = set()
            done_callback = functools.partial(self.run_shutdown_check, running)
            for _ in range(10):
                future = executor.submit(delay_run, 2, operator.add, 10, 20)
                running.add(future)
                future.add_done_callback(done_callback)
            time.sleep(0.5)
            return running
    
        def test_valid_shutdown(self):
            self.moi.set_timeout(1.5)
            running = self.run_submit_loop(self.moi)
            self.moi.shutdown()
            self.assertFalse(running)
    
        def test_error_shutdown(self):
            self.moi.set_timeout(1.5)
            running = self.run_submit_loop(self.moi)
            running.pop()
            self.assertRaises(KeyError, self.moi.shutdown)
            self.assertFalse(running)
    
    
    class TestExecutorAPI(TestTimeout, TestModuleOrInstance, unittest.TestCase):
        CLASS = asynchronous.Executor
        MODULE_OR_INSTANCE = CLASS()
        NAME_OF_MAP = 'map'
    
        def test_valid_context_manager(self):
            with self.instantiate(1.5) as executor:
                running = self.run_submit_loop(executor)
            self.assertFalse(running)
    
        def test_error_context_manager(self):
            error = Exception()
            with self.assertRaises(Exception) as cm:
                with self.instantiate(1.5) as executor:
                    running = self.run_submit_loop(executor)
                    raise error
            self.assertIs(cm.exception, error)
            self.assertFalse(running)
            with self.assertRaises(KeyError):
                with self.instantiate(1.5) as executor:
                    running = self.run_submit_loop(executor)
                    running.pop()
            self.assertFalse(running)
    
    
    class TestModuleAPI(TestModuleOrInstance, unittest.TestCase):
        MODULE_OR_INSTANCE = asynchronous
        NAME_OF_MAP = 'map_'
    
    
    def verify_error():
        sys.stderr.seek(0, io.SEEK_SET)
        for line in sys.stderr:
            if line == 'queue.Full\n':
                break
        else:
            sys.stderr.seek(0, io.SEEK_SET)
            sys.__stderr__.write(sys.stderr.read())
            sys.__stderr__.flush()
    
    
    def cause_error(obj):
        sys.stderr = io.StringIO()
        atexit.register(verify_error)
        inspect.currentframe().f_back.f_back.f_locals['queue'].put_nowait(obj)
    
    
    def return_(obj):
        return obj
    
    
    # noinspection PyUnusedLocal
    def throw(exception, *args):
        raise exception
    
    
    class TestFutureAPI(TestTimer, TestTimeout, unittest.TestCase):
        CLASS = asynchronous._Future
    
        def test_valid_representation(self):
            future = self.instantiate(None, time.sleep, (0.1,), {})
            self.assertRegex(repr(future), r'^<_Future at \d+ state=PENDING>$')
            future._set_running_or_notify_cancel()
            self.assertRegex(repr(future), r'^<_Future at \d+ state=RUNNING>$')
            future._set_running_or_notify_cancel()
            self.assertRegex(
                repr(future),
                r'^<_Future at \d+ state=CANCELLED raised TimeoutError>$'
            )
            future = self.instantiate(None, time.sleep, (0.1,), {})
            future._set_running_or_notify_cancel()
            time.sleep(0.5)
            self.assertRegex(
                repr(future),
                r'^<_Future at \d+ state=FINISHED raised TimeoutError>$'
            )
            self.assertIs(future.exception(), None)
            self.assertRegex(
                repr(future),
                r'^<_Future at \d+ state=FINISHED returned NoneType>$'
            )
    
        def test_error_representation(self):
            future = self.instantiate(0.5, cause_error, (None,), {})
            future._set_running_or_notify_cancel()
            self.assertRaises(TypeError, future.result)
            self.assertIsInstance(future.exception(), TimeoutError)
            self.assertRegex(
                repr(future),
                r'^<_Future at \d+ state=ERROR raised TimeoutError>$'
            )
            future = self.instantiate(0.5, cause_error, ((False, 'okay'),), {})
            future._set_running_or_notify_cancel()
            self.assertEqual(future.result(), 'okay')
            self.assertRegex(
                repr(future),
                r'^<_Future at \d+ state=ERROR returned str>$'
            )
    
        def test_cancel(self):
            future = self.instantiate(None, time.sleep, (0.1,), {})
            self.assertRaises(AttributeError, future.cancel)
            future._set_running_or_notify_cancel()
            future.cancel()
            self.assertTrue(future.cancelled())
            future = self.instantiate(None, time.sleep, (0.1,), {})
            checker = set()
            future.add_done_callback(checker.add)
            future._set_running_or_notify_cancel()
            future.cancel()
            future.cancel()
            self.assertIs(checker.pop(), future)
            self.assertFalse(checker)
    
        def test_cancelled(self):
            future = self.instantiate(None, time.sleep, (0.1,), {})
            self.assertFalse(future.cancelled())
            future._set_running_or_notify_cancel()
            self.assertFalse(future.cancelled())
            self.assertIs(future.result(), None)
            self.assertFalse(future.cancelled())
            future = self.instantiate(None, time.sleep, (0.1,), {})
            future._set_running_or_notify_cancel()
            future.cancel()
            self.assertTrue(future.cancelled())
            future = self.instantiate(0.1, time.sleep, (1,), {})
            future._set_running_or_notify_cancel()
            time.sleep(0.5)
            self.assertTrue(future.cancelled())
    
        def test_running(self):
            future = self.instantiate(None, time.sleep, (0.1,), {})
            self.assertFalse(future.running())
            future._set_running_or_notify_cancel()
            self.assertTrue(future.running())
            self.assertIs(future.result(), None)
            self.assertFalse(future.running())
            future = self.instantiate(None, time.sleep, (0.1,), {})
            future._set_running_or_notify_cancel()
            future.cancel()
            self.assertFalse(future.running())
            future = self.instantiate(0.1, time.sleep, (1,), {})
            future._set_running_or_notify_cancel()
            time.sleep(0.5)
            self.assertFalse(future.running())
    
        def test_done(self):
            future = self.instantiate(None, time.sleep, (0.1,), {})
            self.assertFalse(future.done())
            future._set_running_or_notify_cancel()
            self.assertFalse(future.done())
            self.assertIs(future.result(), None)
            self.assertTrue(future.done())
            future = self.instantiate(None, time.sleep, (None,), {})
            future._set_running_or_notify_cancel()
            self.assertIsInstance(future.exception(), TypeError)
            self.assertTrue(future.done())
    
        def test_result_immediate(self):
            data = os.urandom(1 << 20)
            future = self.instantiate(None, return_, (data,), {})
            future._set_running_or_notify_cancel()
            self.assertEqual(future.result(), data)
            test_exception = Exception('test')
            future = self.instantiate(None, throw, (test_exception,), {})
            future._set_running_or_notify_cancel()
            with self.assertRaises(Exception) as cm:
                future.result()
            self.assertIsInstance(cm.exception, type(test_exception))
            self.assertEqual(cm.exception.args, test_exception.args)
    
        def test_result_delay(self):
            future = self.instantiate(None, delay_run, (0, operator.add, 1, 2), {})
            self.start_timer()
            future._set_running_or_notify_cancel()
            self.assertEqual(future.result(), 3)
            self.stop_timer(0.1)
            future = self.instantiate(None, delay_run, (1, operator.add, 2, 3), {})
            self.start_timer()
            future._set_running_or_notify_cancel()
            self.assertEqual(future.result(), 5)
            self.stop_timer(1)
            future = self.instantiate(0.5, delay_run, (0, operator.add, 1, 2), {})
            self.start_timer()
            future._set_running_or_notify_cancel()
            self.assertEqual(future.result(), 3)
            self.stop_timer(0.1)
            future = self.instantiate(0.5, delay_run, (1, operator.add, 2, 3), {})
            self.start_timer()
            future._set_running_or_notify_cancel()
            self.assertRaises(TimeoutError, future.result)
            self.stop_timer(0.5)
    
        def test_result_before_running(self):
            future = self.instantiate(0.1, delay_run, (0, operator.add, 1, 2), {})
            delay_run(0.5, future._set_running_or_notify_cancel, sync=False)
            self.start_timer()
            self.assertEqual(future.result(), 3)
            self.stop_timer(0.5)
    
        def test_exception_immediate(self):
            data = os.urandom(1 << 20)
            future = self.instantiate(None, return_, (data,), {})
            future._set_running_or_notify_cancel()
            self.assertIs(future.exception(), None)
            test_exception = Exception('test')
            future = self.instantiate(None, throw, (test_exception,), {})
            future._set_running_or_notify_cancel()
            self.assertIsInstance(future.exception(), type(test_exception))
            self.assertEqual(future.exception().args, test_exception.args)
    
        def test_exception_delay(self):
            future = self.instantiate(None, delay_run, (0, operator.add, 1, 2), {})
            self.start_timer()
            future._set_running_or_notify_cancel()
            self.assertIs(future.exception(), None)
            self.stop_timer(0.1)
            future = self.instantiate(None, delay_run, (1, operator.add, 2, 3), {})
            self.start_timer()
            future._set_running_or_notify_cancel()
            self.assertIs(future.exception(), None)
            self.stop_timer(1)
            future = self.instantiate(0.5, delay_run, (0, operator.add, 1, 2), {})
            self.start_timer()
            future._set_running_or_notify_cancel()
            self.assertIs(future.exception(), None)
            self.stop_timer(0.1)
            future = self.instantiate(0.5, delay_run, (1, operator.add, 2, 3), {})
            self.start_timer()
            future._set_running_or_notify_cancel()
            self.assertIsInstance(future.exception(), TimeoutError)
            self.assertFalse(future.exception().args)
            self.stop_timer(0.5)
    
        def test_exception_before_running(self):
            future = self.instantiate(0.1, delay_run, (0, operator.add, 1, 2), {})
            delay_run(0.5, future._set_running_or_notify_cancel, sync=False)
            self.start_timer()
            self.assertIs(future.exception(), None)
            self.stop_timer(0.5)
    
        def test_valid_add_done_callback(self):
            future = self.instantiate(None, time.sleep, (0,), {})
            requires_callback = {future}
            future.add_done_callback(requires_callback.remove)
            self.assertIn(future, requires_callback)
            future._set_running_or_notify_cancel()
            self.assertIs(future.exception(), None)
            self.assertFalse(requires_callback)
            requires_callback.add(future)
            future.add_done_callback(requires_callback.remove)
            self.assertFalse(requires_callback)
    
        def test_error_add_done_callback(self):
            future = self.instantiate(None, time.sleep, (0,), {})
            requires_callback = [{future} for _ in range(10)]
            callbacks = [s.remove for s in requires_callback]
            error = Exception()
            callbacks.insert(5, functools.partial(throw, error))
            for fn in callbacks:
                future.add_done_callback(fn)
            future._set_running_or_notify_cancel()
            with self.assertRaises(Exception) as cm:
                future.exception()
            self.assertIs(cm.exception, error)
            self.assertFalse(any(requires_callback))
    
        def test_set_running_or_notify_cancel(self):
            future = self.instantiate(None, time.sleep, (0.1,), {})
            self.assertFalse(future.running() or future.done())
            future._set_running_or_notify_cancel()
            self.assertTrue(future.running())
            future._set_running_or_notify_cancel()
            self.assertTrue(future.cancelled())
    
    
    if __name__ == '__main__':
        unittest.main()
    

    【讨论】:

      猜你喜欢
      • 2012-10-28
      • 1970-01-01
      • 1970-01-01
      • 2011-10-05
      • 1970-01-01
      • 2021-01-28
      • 2017-11-16
      • 2015-02-21
      • 2011-09-03
      相关资源
      最近更新 更多