【问题标题】:Turn functions with a callback into Python generators?将带有回调的函数转换为 Python 生成器?
【发布时间】:2019-10-17 17:56:07
【问题描述】:

Scipy 最小化函数(仅用作示例)可以选择在每一步添加回调函数。所以我可以做类似的事情,

def my_callback(x):
    print x
scipy.optimize.fmin(func, x0, callback=my_callback)

有没有办法使用回调函数来创建一个生成器版本的fmin,这样我就可以了,

for x in my_fmin(func,x0):
    print x

似乎有可能通过收益率和发送的某种组合,但我可以想到任何事情。

【问题讨论】:

  • 我认为您必须为此使用multithreading,因为您必须同时运行一个输出队列和一个不断产生的生成器。
  • 我认为这不可能。在fmin 的某个地方,对my_callback 的调用需要一个返回值的简单函数。因此,您发送的任何内容都必须尊重该接口。除非我遗漏了什么,否则将其变成生成器的机会在于调用该函数的代码。
  • 这让我想到了 Stackless Python 和 Go 中的 Channels。
  • 这让我想到了call/cc。
  • 当然,在(几乎)任何特定情况下,您也可以复制the source并将the line that does the callback更改为yield

标签: python generator coroutine


【解决方案1】:

对于一个超级简单的方法...

def callback_to_generator():
    data = []
    method_with_callback(blah, foo, callback=data.append)
    for item in data:
        yield item
  • 是的,这不适合大数据
  • 是的,这会阻止首先处理的所有项目
  • 但它仍然可能对某些用例有用:)

还要感谢@winston-ewert,因为这只是他回答的一个小变种:)

【讨论】:

    【解决方案2】:

    处理非阻塞回调的解决方案

    使用threadingqueue 的解决方案相当不错,高性能和跨平台,可能是最好的解决方案。

    这里我提供了这个还不错的解决方案,主要用于处理非阻塞回调,例如通过threading.Thread(target=callback).start()或其他非阻塞方式从父函数调用。

    import pickle
    import select
    import subprocess
    
    def my_fmin(func, x0):
        # open a process to use as a pipeline
        proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
    
        def my_callback(x):
            # x might be any object, not only str, so we use pickle to dump it
            proc.stdin.write(pickle.dumps(x).replace(b'\n', b'\\n') + b'\n')
            proc.stdin.flush()
    
        from scipy import optimize
        optimize.fmin(func, x0, callback=my_callback)
    
        # this is meant to handle non-blocking callbacks, e.g. called somewhere 
        # through `threading.Thread(target=callback).start()`
        while select.select([proc.stdout], [], [], 0)[0]:
            yield pickle.loads(proc.stdout.readline()[:-1].replace(b'\\n', b'\n'))
    
        # close the process
        proc.communicate()
    

    然后你可以像这样使用函数:

    # unfortunately, `scipy.optimize.fmin`'s callback is blocking.
    # so this example is just for showing how-to.
    for x in my_fmin(lambda x: x**2, 3):
        print(x)
    

    虽然这个解决方案看起来相当简单易读,但它的性能不如threadingqueue 解决方案,因为:

    • 进程比线程重得多。
    • 通过管道而不是内存传递数据要慢得多。

    此外,它不适用于 Windows,因为 Windows 上的 select 模块只能处理套接字,不能处理管道和其他文件描述符。

    【讨论】:

    • “使用[仅限python标准库]的解决方案很好,但不是pythonic” - 需要引用。您的解决方案仅适用于具有 cat 的 posix 系统,并且进程通常比线程更昂贵。
    • @Eric 感谢您的更正,这很有帮助。我已经发布了我更正的答案,这已经解决了问题。
    【解决方案3】:

    Frits 回答的一个变体,即:

    • 支持send为回调选择返回值
    • 支持throw为回调选择异常
    • 支持close优雅关机
    • 在请求队列项之前不计算队列项

    完整的测试代码见on github

    import queue
    import threading
    import collections.abc
    
    class generator_from_callback(collections.abc.Generator):
        def __init__(self, expr):
            """
            expr: a function that takes a callback
            """ 
            self._expr = expr
            self._done = False
            self._ready_queue = queue.Queue(1)
            self._done_queue = queue.Queue(1)
            self._done_holder = [False]
    
            # local to avoid reference cycles
            ready_queue = self._ready_queue
            done_queue = self._done_queue
            done_holder = self._done_holder
    
            def callback(value):
                done_queue.put((False, value))
                cmd, *args = ready_queue.get()
                if cmd == 'close':
                    raise GeneratorExit
                elif cmd == 'send':
                    return args[0]
                elif cmd == 'throw':
                    raise args[0]
    
            def thread_func():
                try:
                    cmd, *args = ready_queue.get()
                    if cmd == 'close':
                        raise GeneratorExit
                    elif cmd == 'send':
                        if args[0] is not None:
                            raise TypeError("can't send non-None value to a just-started generator")
                    elif cmd == 'throw':
                        raise args[0]
                    ret = expr(callback)
                    raise StopIteration(ret)
                except BaseException as e:
                    done_holder[0] = True
                    done_queue.put((True, e))
            self._thread = threading.Thread(target=thread_func)
            self._thread.start()
    
        def __next__(self):
            return self.send(None)
    
        def send(self, value):
            if self._done_holder[0]:
                raise StopIteration
            self._ready_queue.put(('send', value))
            is_exception, val = self._done_queue.get()
            if is_exception:
                raise val
            else:
                return val
    
        def throw(self, exc):
            if self._done_holder[0]:
                raise StopIteration
            self._ready_queue.put(('throw', exc))
            is_exception, val = self._done_queue.get()
            if is_exception:
                raise val
            else:
                return val
    
        def close(self):
            if not self._done_holder[0]:
                self._ready_queue.put(('close',))
            self._thread.join()
    
        def __del__(self):
            self.close()
    

    作用如下:

    In [3]: def callback(f):
       ...:     ret = f(1)
       ...:     print("gave 1, got {}".format(ret))
       ...:     f(2)
       ...:     print("gave 2")
       ...:     f(3)
       ...:
    
    In [4]: i = generator_from_callback(callback)
    
    In [5]: next(i)
    Out[5]: 1
    
    In [6]: i.send(4)
    gave 1, got 4
    Out[6]: 2
    
    In [7]: next(i)
    gave 2, got None
    Out[7]: 3
    
    In [8]: next(i)
    StopIteration
    

    对于scipy.optimize.fmin,您将使用generator_from_callback(lambda c: scipy.optimize.fmin(func, x0, callback=c))

    【讨论】:

      【解决方案4】:

      生成器作为协程(无线程)

      FakeFtpretrbinary 函数使用回调,每次成功读取数据块时都会调用:

      class FakeFtp(object):
          def __init__(self):
              self.data = iter(["aaa", "bbb", "ccc", "ddd"])
      
          def login(self, user, password):
              self.user = user
              self.password = password
      
          def retrbinary(self, cmd, cb):
              for chunk in self.data:
                  cb(chunk)
      

      使用简单的回调函数有个缺点,就是反复调用,回调 函数不能轻易地在调用之间保持上下文。

      以下代码定义了process_chunks 生成器,它将能够接收数据块之一 通过一个并处理它们。与简单的回调相比,这里我们可以保留所有 在一个函数内处理而不丢失上下文。

      from contextlib import closing
      from itertools import count
      
      
      def main():
          processed = []
      
          def process_chunks():
              for i in count():
                  try:
                      # (repeatedly) get the chunk to process
                      chunk = yield
                  except GeneratorExit:
                      # finish_up
                      print("Finishing up.")
                      return
                  else:
                      # Here process the chunk as you like
                      print("inside coroutine, processing chunk:", i, chunk)
                      product = "processed({i}): {chunk}".format(i=i, chunk=chunk)
                      processed.append(product)
      
          with closing(process_chunks()) as coroutine:
              # Get the coroutine to the first yield
              coroutine.next()
              ftp = FakeFtp()
              # next line repeatedly calls `coroutine.send(data)`
              ftp.retrbinary("RETR binary", cb=coroutine.send)
              # each callback "jumps" to `yield` line in `process_chunks`
      
          print("processed result", processed)
          print("DONE")
      

      要查看实际代码,请放入FakeFtp 类,代码显示在上面和下面一行:

      main()
      

      放入一个文件并调用它:

      $ python headsandtails.py
      ('inside coroutine, processing chunk:', 0, 'aaa')
      ('inside coroutine, processing chunk:', 1, 'bbb')
      ('inside coroutine, processing chunk:', 2, 'ccc')
      ('inside coroutine, processing chunk:', 3, 'ddd')
      Finishing up.
      ('processed result', ['processed(0): aaa', 'processed(1): bbb', 'processed(2): ccc', 'processed(3): ddd'])
      DONE
      

      工作原理

      processed = [] 在这里只是为了说明,生成器process_chunks 应该没有问题 配合其外部环境。全部包裹成def main():证明,不需要 使用全局变量。

      def process_chunks() 是解决方案的核心。它可能有一个镜头输入参数(不是 此处使用),但主要的一点是,它接收输入的每个 yield 行返回任何人发送的内容 通过.send(data) 进入这个生成器的实例。可以coroutine.send(chunk),但在此示例中,它是通过引用此函数callback.send 的回调来完成的。

      注意,在实际解决方案中,代码中有多个yields 是没有问题的,它们是 一一处理。这可能用于例如读取(并忽略)CSV 文件的标题,然后 继续处理带有数据的记录。

      我们可以按如下方式实例化和使用生成器:

      coroutine = process_chunks()
      # Get the coroutine to the first yield
      coroutine.next()
      
      ftp = FakeFtp()
      # next line repeatedly calls `coroutine.send(data)`
      ftp.retrbinary("RETR binary", cb=coroutine.send)
      # each callback "jumps" to `yield` line in `process_chunks`
      
      # close the coroutine (will throw the `GeneratorExit` exception into the
      # `process_chunks` coroutine).
      coroutine.close()
      

      真正的代码是使用contextlibclosing上下文管理器来确保coroutine.close()是 总是被调用。

      结论

      此解决方案不提供某种迭代器来以传统样式“从 外部”。另一方面,我们能够:

      • “从内部”使用生成器
      • 将所有迭代处理保留在一个函数中,而不会在回调之间中断
      • 可选择使用外部上下文
      • 向外部提供有用的结果
      • 所有这些都可以在不使用线程的情况下完成

      致谢:该解决方案深受 user2357112

      所写的 SO answer Python FTP “chunk” iterator (without loading entire file into memory) 的启发

      【讨论】:

      • 很好的答案,谢谢。如果你明确定义了一个上下文管理器,你可以在其中调用 coroutine.next() ,这是值得的,对吧?
      • 这篇文章对协程的使用很有启发。但是让我不解的是main这个函数和下面的有什么不同吗? def main(): processed = []; ftp.retrbinary("RETR binary", cb=processed.append); return processed 不知道是不是我误解了你的回答,但我认为问题的关键在于“函数应该能够处理无限次回调而不会爆炸内存,就像流或管道一样” .我认为这就是为什么我们要使用一些yield,但显然,processed 列表破坏了计划......
      • @henryzhu 您缩短的main 可以工作,但不能作为使用生成器的示例。 processed 列表只是为了证明我们已经处理了什么,可以通过将数据写入文件或其他流来替换它,而不是处理无限大小的项目/数据。问题要求重写生成器的回调,所以我做了它并保持其余部分简短(因此使用processed 列表,而不是输出流)。
      • @JanVlcinsky 哦,是的,我明白了。所以我认为这个答案的目的主要是为了使用生成器,而不是为了回应提问者的提问:for x in my_fmin(func,x0): print x。毕竟,如果我们将processed列表的数据写入到文件或者其他流中,就不能像上图那样通过for循环进行迭代。尽管如此,这仍然是一个很好的答案。
      【解决方案5】:

      概念使用带有maxsize=1 和生产者/消费者模型的阻塞队列。

      回调产生,然后对回调的下一次调用将阻塞整个队列。

      消费者然后从队列中产生值,尝试获取另一个值,并在读取时阻塞。

      生产者被允许推入队列,冲洗并重复。

      用法:

      def dummy(func, arg, callback=None):
        for i in range(100):
          callback(func(arg+i))
      
      # Dummy example:
      for i in Iteratorize(dummy, lambda x: x+1, 0):
        print(i)
      
      # example with scipy:
      for i in Iteratorize(scipy.optimize.fmin, func, x0):
         print(i)
      

      可以按预期用于迭代器:

      for i in take(5, Iteratorize(dummy, lambda x: x+1, 0)):
        print(i)
      

      迭代类:

      from thread import start_new_thread
      from Queue import Queue
      
      class Iteratorize:
        """ 
        Transforms a function that takes a callback 
        into a lazy iterator (generator).
        """
        def __init__(self, func, ifunc, arg, callback=None):
          self.mfunc=func
          self.ifunc=ifunc
          self.c_callback=callback
          self.q = Queue(maxsize=1)
          self.stored_arg=arg
          self.sentinel = object()
      
          def _callback(val):
            self.q.put(val)
      
          def gentask():
            ret = self.mfunc(self.ifunc, self.stored_arg, callback=_callback)
            self.q.put(self.sentinel)
            if self.c_callback:
              self.c_callback(ret)
      
          start_new_thread(gentask, ())
      
        def __iter__(self):
          return self
      
        def next(self):
          obj = self.q.get(True,None)
          if obj is self.sentinel:
           raise StopIteration 
          else:
            return obj
      

      可能需要进行一些清理以接受 *args**kwargs 以用于被包装的函数和/或最终结果回调。

      【讨论】:

      • +1 用于概括任何函数的代码,但为了完整起见,请参阅我的更新答案。 maxsize=1 是不够的,如果你想阻塞生产者直到消费者完成它,最好使用 Queue.joinQueue.task_done。 (如果您想要那样,那么 maxsize 的意义何在?)我重申我对 Winston Ewert 回答的评论:没有办法干净地退出函数 - take 示例将永远阻塞线程,永远不会释放与其关联的资源。对于这个问题,不幸的是,我知道没有简单的解决方案。
      • 你是对的!使用 Queue.join 实际上也更正确!它将阻止回调在阻塞之前被第二次调用,从而在底层函数有副作用时导致正确的行为。 +1 不错的收获。
      【解决方案6】:

      正如 cmets 中所指出的,您可以使用 Queue 在新线程中执行此操作。缺点是您仍然需要某种方式来访问最终结果(fmin 最后返回的内容)。我下面的示例使用一个可选的回调来处理它(另一种选择是也只产生它,尽管您的调用代码必须区分迭代结果和最终结果):

      from thread import start_new_thread
      from Queue import Queue
      
      def my_fmin(func, x0, end_callback=(lambda x:x), timeout=None):
      
          q = Queue() # fmin produces, the generator consumes
          job_done = object() # signals the processing is done
      
          # Producer
          def my_callback(x):
              q.put(x)
          def task():
              ret = scipy.optimize.fmin(func,x0,callback=my_callback)
              q.put(job_done)
              end_callback(ret) # "Returns" the result of the main call
      
          # Starts fmin in a new thread
          start_new_thread(task,())
      
          # Consumer
          while True:
              next_item = q.get(True,timeout) # Blocks until an input is available
              if next_item is job_done:
                  break
              yield next_item
      

      更新:要阻止下一次迭代的执行,直到消费者处理完最后一次,还需要使用task_donejoin

          # Producer
          def my_callback(x):
              q.put(x)
              q.join() # Blocks until task_done is called
      
          # Consumer
          while True:
              next_item = q.get(True,timeout) # Blocks until an input is available
              if next_item is job_done:
                  break
              yield next_item
              q.task_done() # Unblocks the producer, so a new iteration can start
      

      请注意,maxsize=1 不是必需的,因为在消耗完最后一项之前不会将新项添加到队列中。

      更新 2: 另请注意,除非此生成器最终检索到所有项目,否则创建的线程将死锁(它将永远阻塞并且永远不会释放其资源)。生产者在队列中等待,因为它存储了对该队列的引用,所以即使消费者在,它也永远不会被 gc 回收。然后队列将变得无法访问,因此没有人能够释放锁。

      如果可能的话,一个干净的解决方案是未知的(因为它取决于用于代替fmin 的特定函数)。可以使用timeout 进行变通,如果put 阻塞时间过长,则让生产者引发异常:

          q = Queue(maxsize=1)
      
          # Producer
          def my_callback(x):
              q.put(x)
              q.put("dummy",True,timeout) # Blocks until the first result is retrieved
              q.join() # Blocks again until task_done is called
      
          # Consumer
          while True:
              next_item = q.get(True,timeout) # Blocks until an input is available
              q.task_done()                   # (one "task_done" per "get")
              if next_item is job_done:
                  break
              yield next_item
              q.get() # Retrieves the "dummy" object (must be after yield)
              q.task_done() # Unblocks the producer, so a new iteration can start
      

      【讨论】:

      • 与@Winston Ewert 的回答相同。这将评估回调,直到 fmin 函数返回。它将在生成器开始产生之前强制评估每个元素的回调。
      • 非常好。要解决@brice 的观点,请使用Queue(maxsize=1)q.put(x,block=True)。否则我看不出有任何问题。
      • 打败我吧,@marius。看我的回答。
      • 在这种情况下,maxsize=1 是不够的,因为 fmin 将在第一个项目被消耗时继续处理下一次迭代(因为它在第一次放置时没有阻塞;它只会阻塞 在第二次迭代完成后,它试图将结果放入队列中)。请参阅我的更新答案。
      • 至少在 Python 2.6 中,对 q.get 的两次调用都需要调用 q.task_done(),而不仅仅是在虚拟 get 之后。
      【解决方案7】:

      怎么样

      data = []
      scipy.optimize.fmin(func,x0,callback=data.append)
      for line in data:
          print line
      

      如果不是,您究竟想对生成器的数据做什么?

      【讨论】:

      • 我认为他提出的问题是笼统的:如何将回调转换为生成器,而不是询问特定情况。
      • 这是对的,但我应该更清楚。回调版本在每一步评估回调(这是我想要的),而这会完成整个最小化,然后在每一步调用回调代码。
      • @brice 我看到的从任意函数创建无限生成器的主要问题是如何在生成器完成后发出信号以停止其执行。在我的answer's 示例中,一个选项是将队列大小设置为 1 并向生产者添加超时,因此当消费者不请求新值时它会阻塞,并且在一段时间后被异常“杀死”队列增加。但是对于一个干净的退出,您需要有关该特定功能的特定信息,或者它已经具有与生成器接口的方法。
      猜你喜欢
      • 2016-05-20
      • 1970-01-01
      • 2023-03-24
      • 2020-02-22
      • 2018-07-09
      • 2011-11-08
      • 2019-05-28
      • 2021-01-03
      相关资源
      最近更新 更多