【问题标题】:python: how-to do a periodic non-blocking lookuppython:如何进行定期非阻塞查找
【发布时间】:2017-11-29 19:45:17
【问题描述】:

能否请您告知如何定期 执行一个比周期性间隔花费更多时间的任务?

例如:

def lookup():
    # do some lookups, retrieve info, let's assume it takes 60sec to complete
    msg = {'abc':123}
    time.sleep(60)
    return msg

class Publisher(object):
    def __init__(self):
        self._TIMEIT = 0
        self._INTERVAL = 5
        self._counter = 0

    def xxx():
        t_start = time.time()
        msg = lookup()
        # do something with the value returned
        save_msg_to_db(msg)
        self._counter += 1
        t_end = time.time()
        self._TIMEIT = int(math.ceil(t_end - t_start))

    def run():
        while True:
            # let's do the lookup every 5sec, but remember that lookup takes 60sec to complete
            time.sleep(self._INTERVAL)
            # the call to xxx() should be non-blocking
            xxx()

但是run方法负责调度周期性任务, 并且当它迭代时,它不应该在调用函数xxx时阻塞。

我正在考虑在每次调用xxx 函数时创建一个事件循环,如A Bad Coroutine Example 中所述,但是如何对xxx 进行非阻塞调用?

附言。我正在使用新到 asyncio 的 Python3.4(过去使用 gevent),不确定我是否在这里问愚蠢。

所以lookup 将创建一个异步循环,假设需要 60 秒才能完成。但是,在run 方法中,有一个无限循环运行,我希望它每 5 秒进行一次查找,换句话说,我想(1)我多久调用一次查找函数,独立于 (2) 完成查找需要多长时间

【问题讨论】:

  • 您的 lookup() 函数是 CPU 密集型还是由于 I/O 操作而需要很长时间才能完成?
  • : "由于 I/O 操作需要很长时间才能完成"
  • lookup() 运行时会发生什么?如果你想在后台运行它(非阻塞),它需要 60 秒才能完成,并且你很快每 5 秒调用一次,你将有数千个运行。所以必须给出一些东西 - 您的检查间隔(延长它)或对 lookup() 函数的调用(如果前一个没有完成,请不要进行另一个调用)。
  • @zwer 好吧,我相信总会有 60/5=12 个任务,我完全可以接受

标签: python python-3.x nonblocking python-asyncio periodic-task


【解决方案1】:

由于您的 lookup() 主要是 I/O 密集型,您可以将您的 xxx() 方法作为线程运行并且非常好(为简洁起见缩短代码):

import threading
import time

class Publisher(object):

    def __init__(self):
        self._INTERVAL = 5
        self._counter = 0
        self._mutex = threading.Lock()

    def xxx(self):
        msg = lookup()
        save_msg_to_db(msg)
        with self._mutex:  # make sure only one thread is modifying counter at a given time
            self._counter += 1

    def run(self):
        while True:
            time.sleep(self._INTERVAL)
            t = threading.Thread(target=self.xxx)
            t.setDaemon(True)  # so we don't need to track/join threads
            t.start()  # start the thread, this is non-blocking

【讨论】:

  • 谢谢@zwer 让我评估一下,会回复你的
猜你喜欢
  • 2013-05-24
  • 1970-01-01
  • 1970-01-01
  • 2018-07-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-09-27
相关资源
最近更新 更多