【问题标题】:How to design an async pipeline pattern in python如何在 python 中设计异步管道模式
【发布时间】:2016-06-20 11:13:04
【问题描述】:

我正在尝试设计一个可以轻松制作数据处理管道的异步管道。管道由几个功能组成。输入数据从管道的一端进入,从另一端出来。

我想以如下方式设计管道:

  1. 可以在管道中插入其他函数
  2. 可以弹出已经在管道中的函数。

这是我想出的:

import asyncio

@asyncio.coroutine
def add(x):
    return x + 1

@asyncio.coroutine
def prod(x):
    return x * 2

@asyncio.coroutine
def power(x):
    return x ** 3

def connect(funcs):
    def wrapper(*args, **kwargs):
        data_out = yield from funcs[0](*args, **kwargs)
        for func in funcs[1:]:
            data_out = yield from func(data_out)
        return data_out
    return wrapper

pipeline = connect([add, prod, power])
input = 1
output = asyncio.get_event_loop().run_until_complete(pipeline(input))
print(output)

这当然可行,但问题是如果我想在这个管道中添加另一个函数(或从中弹出一个函数),我必须重新分解并重新连接每个函数。

我想知道是否有更好的方案或设计模式来创建这样的管道?

【问题讨论】:

  • 我认为标准的做法是重新创建管道,例如 connect([add, prod, somethingelse, power])connect([add, power])。你有理由不想这样做吗?或者我不明白你的问题?
  • 我想你明白我的意思了,我不想重新创建整个东西,因为如果管道包含几十个功能,当你只需要更改一小部分时重新创建所有东西并不优雅,而且我需要经常更改一些功能,重新创建一切变得乏味和低效。
  • 您似乎可以创建一个 Pipeline 类并使用您的函数列表维护一个实例 var,然后实现从该列表中获取/删除函数的方法。然后只需实现__call__ 以便可以将 Pipeline 的实例发送到 asyncio 事件循环。
  • @EricConner 我不完全理解您的建议,按照您的建议,我认为问题在于如何实现获取/删除功能?我是否必须重新连接每个功能才能更改一个功能?
  • 您能否将您的函数存储在列表中,然后让您的管道引用这些列表?比如L1 = [add, prod, power]pipeline1 = connect(L1)pipeline2 = connect(L1 + [power])pipeline3 = connect([x for x in L1 if x != add])

标签: python python-3.x asynchronous python-asyncio python-decorators


【解决方案1】:

我之前做过类似的事情,只使用了multiprocessing 库。它有点手动,但它使您能够按照您在问题中的要求轻松创建和修改管道。

这个想法是创建可以存在于多处理池中的函数,它们的唯一参数是输入队列和输出队列。您通过传递不同的队列将这些阶段联系在一起。每个阶段在其输入队列上接收一些工作,做更多工作,然后通过其输出队列将结果传递到下一个阶段。

工人不断尝试从他们的队列中获取一些东西,当他们得到一些东西时,他们会做他们的工作并将结果传递到下一个阶段。所有工作都以通过管道传递“毒丸”结束,导致所有阶段退出:

这个例子只是在多个工作阶段构建一个字符串:

import multiprocessing as mp                                              

POISON_PILL = "STOP"                                                      

def stage1(q_in, q_out):                                                  

    while True:

        # get either work or a poison pill from the previous stage (or main)
        val = q_in.get()                                                  

        # check to see if we got the poison pill - pass it along if we did
        if val == POISON_PILL:                                            
            q_out.put(val)                                                
            return                                                        

        # do stage 1 work                                                                  
        val = val + "Stage 1 did some work.\n"

        # pass the result to the next stage
        q_out.put(val)                                                    

def stage2(q_in, q_out):                                                  

    while True:                                                           

        val = q_in.get()                                                  
        if val == POISON_PILL:                                            
            q_out.put(val)                                                
            return                                                        

        val = val + "Stage 2 did some work.\n"                            
        q_out.put(val)                                                    

def main():                                                               

    pool = mp.Pool()                                                      
    manager = mp.Manager()                                                

    # create managed queues                                               
    q_main_to_s1 = manager.Queue()                                        
    q_s1_to_s2 = manager.Queue()                                          
    q_s2_to_main = manager.Queue()                                        

    # launch workers, passing them the queues they need                   
    results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))     
    results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))     

    # Send a message into the pipeline                                    
    q_main_to_s1.put("Main started the job.\n")                           

    # Wait for work to complete                                           
    print(q_s2_to_main.get()+"Main finished the job.")                    

    q_main_to_s1.put(POISON_PILL)                                         

    pool.close()                                                          
    pool.join()                                                           

    return                                                                

if __name__ == "__main__":                                                
    main()

代码产生这个输出:

Main 开始了这项工作。
第 1 阶段做了一些工作。
第 2 阶段做了一些工作。
主要完成了工作。

您可以轻松地将更多阶段放入管道中,或者只需更改哪些函数获取哪些队列即可重新排列它们。我对asyncio 模块不是很熟悉,所以我不能说使用多处理库会失去什么功能,但是这种方法很容易实现和理解,所以我喜欢它的简单性。

【讨论】:

    【解决方案2】:

    我不知道这是否是最好的方法,但这是我的解决方案。

    虽然我认为可以使用列表或字典来控制管道,但我发现使用生成器更容易、更有效。

    考虑以下生成器:

    def controller():
        old = value = None
        while True:
            new = (yield value)
            value = old
            old = new
    

    这基本上是一个单元素队列,它存储您发送它的值并在下次调用send(或next)时释放它。

    例子:

    >>> c = controller()
    >>> next(c)           # prime the generator
    >>> c.send(8)         # send a value
    >>> next(c)           # pull the value from the generator
    8
    

    通过将管道中的每个协程与其控制器相关联,我们将拥有一个外部句柄,我们可以使用它来推送每个协程的目标。我们只需要定义我们的协程,让它们在每个周期都从我们的控制器中拉出新的目标。

    现在考虑以下协程:

    def source(controller):
        while True:
            target = next(controller)
            print("source sending to", target.__name__) 
            yield (yield from target)
    
    def add():
        return (yield) + 1
    
    def prod():
        return (yield) * 2
    

    源是一个没有return的协程,因此它不会在第一个循环后自行终止。其他协程是“汇”,不需要控制器。 您可以在管道中使用这些协程,如下例所示。我们最初设置了一个路由source --> add,在收到第一个结果后,我们将路由更改为source --> prod

    # create a controller for the source and prime it 
    cont_source = controller()
    next(cont_source)
    
    # create three coroutines
    # associate the source with its controller
    coro_source = source(cont_source)
    coro_add = add()
    coro_prod = prod()
    
    # create a pipeline
    cont_source.send(coro_add)
    
    # prime the source and send a value to it
    coro_source.send(None)
    print("add =", coro_source.send(4))
    
    # change target of the source
    cont_source.send(coro_prod)
    
    # reset the source, send another value
    coro_source.send(None)
    print("prod =", coro_source.send(8))
    

    输出:

    source sending to add
    add = 5
    source sending to prod
    prod = 16
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-12-10
      • 2023-03-29
      • 1970-01-01
      • 2016-06-10
      • 2010-12-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多