【问题标题】:Queues and multiprocessing队列和多处理
【发布时间】:2013-12-02 20:48:46
【问题描述】:

我正在编写一些代码来构建可变长度 (Huffman) 代码表,并且我想使用多处理模块来获得乐趣。这个想法是让每个进程尝试从队列中获取一个节点。他们确实在节点上工作,或者将该节点的两个子节点放回工作队列,或者将可变长度代码放入结果队列。他们还将消息传递到消息队列,该消息队列应由主进程中的线程打印。到目前为止的代码如下:

import Queue
import multiprocessing as mp
from threading import Thread
from collections import Counter, namedtuple

Node = namedtuple("Node", ["child1", "child2", "weight", "symbol", "code"])

def _sort_func(node):
    return node.weight

def _encode_proc(proc_number, work_queue, result_queue, message_queue):
    while True:
        try:
            #get a node from the work queue
            node = work_queue.get(timeout=0.1)
            #if it is an end node, add the symbol-code pair to the result queue
            if node.child1 == node.child2 == None:
                message_queue.put("Symbol processed! : proc%d" % proc_number)
                result_queue.put({node.symbol:node.code})
            #otherwise do some work and add some nodes to the work queue
            else:
                message_queue.put("More work to be done! : proc%d" % proc_number)
                node.child1.code.append(node.code + '0')
                node.child2.code.append(node.code + '1')
                work_queue.put(node.child1)
                work_queue.put(node.child2)
        except Queue.Empty: #everything is probably done
            return

def _reporter_thread(message_queue):
    while True:
        try:
            message = message_queue.get(timeout=0.1)
            print message
        except Queue.Empty: #everything is probably done
            return

def _encode_tree(tree, symbol_count):
    """Uses multiple processes to walk the tree and build the huffman codes."""
    #Create a manager to manage the queues, and a pool of workers.
    manager = mp.Manager()
    worker_pool = mp.Pool()
    #create the queues you will be using
    work = manager.Queue()
    results = manager.Queue()
    messages = manager.Queue()
    #add work to the work queue, and start the message printing thread
    work.put(tree)
    message_thread = Thread(target=_reporter_thread, args=(messages,))
    message_thread.start()
    #add the workers to the pool and close it
    for i in range(mp.cpu_count()):
        worker_pool.apply_async(_encode_proc, (i, work, results, messages))
    worker_pool.close()
    #get the results from the results queue, and update the table of codes
    table = {}
    while symbol_count > 0:
        try:
            processed_symbol = results.get(timeout=0.1)
            table.update(processed_symbol)
            symbol_count -= 1
        except Queue.Empty:
            print "WAI DERe NO SYMBOLzzzZzz!!!"
        finally:
            print "Symbols to process: %d" % symbol_count
    return table

def make_huffman_table(data):
    """
    data is an iterable containing the string that needs to be encoded.
    Returns a dictionary mapping symbols to codes.
    """
    #Build a list of Nodes out of the characters in data
    nodes = [Node(None, None, weight, symbol, bytearray()) for symbol, weight in Counter(data).items()]
    nodes.sort(reverse=True, key=_sort_func)
    symbols = len(nodes)
    append_node = nodes.append
    while len(nodes) > 1:
        #make a new node out of the two nodes with the lowest weight and add it to the list of nodes.
        child2, child1 = nodes.pop(), nodes.pop()
        new_node = Node(child1, child2, child1.weight+child2.weight, None, bytearray())
        append_node(new_node)
        #then resort the nodes
        nodes.sort(reverse=True, key=_sort_func)
    top_node = nodes[0]
    return _encode_tree(top_node, symbols)

def chars(fname):
    """
    A simple generator to make reading from files without loading them 
    totally into memory a simple task.
    """
    f = open(fname)
    char = f.read(1)
    while char != '':
        yield char
        char = f.read(1)
    f.close()
    raise StopIteration

if __name__ == "__main__":
    text = chars("romeo-and-juliet.txt")
    table = make_huffman_table(text)
    print table

这个的当前输出是:

More work to be done! : proc0
WAI DERe NO SYMBOLzzzZzz!!!
Symbols to process: 92
WAI DERe NO SYMBOLzzzZzz!!!
Symbols to process: 92
WAI DERe NO SYMBOLzzzZzz!!!
Symbols to process: 92

它只是永远重复最后一点。在第一个进程向节点添加工作后,一切都停止了。这是为什么?我不理解/正确使用队列吗?抱歉,所有代码都无法阅读。

【问题讨论】:

    标签: python multithreading queue multiprocessing huffman-code


    【解决方案1】:

    您的第一个问题是尝试使用超时。他们几乎从来都不是一个好主意。如果您想不出一种可靠的方法来有效地做某事,他们可能是个好主意,并且您只将超时用作第一个 检查某件事是否真的完成了。

    也就是说,主要问题是multiprocessing 在报告工作进程中发生的异常方面通常非常糟糕。您的代码实际上在这里死了:

    node.child1.code.append(node.code + '0')
    

    您没有看到的错误消息是“需要一个大小为 1 的整数或字符串”。您不能将bytearray 附加到bytearray。你想做的:

    node.child1.code.extend(node.code + '0')
                     ^^^^^^
    

    相反,在child2 的类似行中。照原样,因为第一个从工作队列中取出某些东西的工作进程死了,所以没有更多的东西被添加到工作队列中。这解释了你所看到的一切 - 到目前为止;-)

    没有超时

    仅供参考,避免超时(不稳定 - 不可靠)的常用方法是在队列中放置一个特殊的哨兵值。消费者在看到哨兵时就知道是时候退出了,并使用普通的阻塞.get() 从队列中检索项目。所以首先要做的是创建一个哨兵;例如,在顶部附近添加:

    ALL_DONE = "all done"
    

    最佳实践是.join()线程和进程 - 这样主程序知道(不仅仅是猜测)他们也完成了。

    所以,您可以像这样更改_encode_tree() 的结尾:

    for i in range(1, symbol_count + 1):
        processed_symbol = results.get()
        table.update(processed_symbol)
        print "Symbols to process: %d" % (symbol_count - i)
    for i in range(mp.cpu_count()):
        work.put(ALL_DONE)
    worker_pool.join()
    messages.put(ALL_DONE)
    message_thread.join()
    return table
    

    这里的关键是,主程序知道所有工作都在何时且仅在没有符号需要处理时才完成。在此之前,它可以无条件地从results 队列中得到.get() 结果。然后它会在工作队列中放置与工人数量相等的哨兵。他们每个人都会消耗一个哨兵并退出。然后我们等待他们完成 (worker_pool.join())。然后在消息队列中放置一个哨兵,我们也等待该线程结束。只有这样,函数才会返回。

    现在没有什么会提前终止,一切都干净利落地关闭,最终表格的输出不再与工作人员和消息线程的各种其他输出混在一起。 _reporter_thread() 被重写如下:

    def _reporter_thread(message_queue):
        while True:
            message = message_queue.get()
            if message == ALL_DONE:
                break
            else:
                print message
    

    _encode_proc() 也是如此。不再有超时或try/except Queue.Empty: 摆弄。你甚至不必再导入Queue :-)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-04-18
      • 2020-05-18
      • 2016-05-02
      • 1970-01-01
      • 2015-10-11
      • 2010-10-29
      • 1970-01-01
      相关资源
      最近更新 更多