【问题标题】:Logging with multiprocessing madness多处理疯狂的日志记录
【发布时间】:2013-12-18 09:37:19
【问题描述】:

我试图在多处理场景中使用 python 的默认日志记录模块。 我读过:

  1. Python MultiProcess, Logging, Various Classes
  2. Logging using multiprocessing

以及其他关于多处理、日志记录、python 类等的多篇文章。 经过所有这些阅读之后,我来到了这段代码,我无法让它正确运行,它使用 python 的 logutils QueueHandler:

import sys
import logging
from logging import INFO
from multiprocessing import Process, Queue as mpQueue
import threading
import time

from logutils.queue import QueueListener, QueueHandler


class Worker(Process):

    def __init__(self, n, q):
        super(Worker, self).__init__()
        self.n = n
        self.queue = q

        self.qh = QueueHandler(self.queue)
        self.root = logging.getLogger()
        self.root.addHandler(self.qh)
        self.root.setLevel(logging.DEBUG)        
        self.logger = logging.getLogger("W%i"%self.n)


    def run(self):
        self.logger.info("Worker %i Starting"%self.n)

        for i in xrange(10):
            self.logger.log(INFO, "testing %i"%i)

        self.logger.log(INFO, "Completed %i"%self.n)


def listener_process(queue):
    while True:
        try:
            record = queue.get()
            if record is None:
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            import sys, traceback
            print >> sys.stderr, 'Whoops! Problem:'
            traceback.print_exc(file=sys.stderr)

if __name__ == "__main__":

    mpq = mpQueue(-1)

    root = logging.getLogger()
    h = logging.StreamHandler()    
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s     %(message)s')
    h.setFormatter(f)
    root.addHandler(h)

    l = logging.getLogger("Test")
    l.setLevel(logging.DEBUG)

    listener = Process(target=listener_process,
                       args=(mpq,))
    listener.start()
    workers=[]
    for i in xrange(1):
        worker = Worker(i, mpq)
        worker.daemon = True
        worker.start()
        workers.append(worker)

    for worker in workers:
        worker.join()

    mpq.put_nowait(None)
    listener.join()

    for i in xrange(10):
        l.info("testing %i"%i)

    print "Finish"

如果代码被执行,输出会以某种方式重复如下行:

2013-12-02 16:44:46,002 Worker-2   W0 INFO         Worker 0 Starting
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 0
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 1
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 2
2013-12-02 16:44:46,002 Worker-2   W0 INFO         Worker 0 Starting
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 3
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 0
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 1
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 4
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 2
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 3
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 5
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 4
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 6
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 5
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 7
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 6
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 8
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 7
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 9
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 8
2013-12-02 16:44:46,004 Worker-2   W0 INFO         Completed 0
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 9
2013-12-02 16:44:46,004 Worker-2   W0 INFO         Completed 0
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 0
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 1
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 2
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 3
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 4
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 5
2013-12-02 16:44:46,006 MainProcess Test INFO         testing 6
2013-12-02 16:44:46,006 MainProcess Test INFO         testing 7
2013-12-02 16:44:46,006 MainProcess Test INFO         testing 8
2013-12-02 16:44:46,006 MainProcess Test INFO         testing 9
Finish

在其他问题中,建议多次添加处理程序,但是,如您所见,我只在 ma​​in 方法中添加了一次流处理程序。 我已经测试了将 ma​​in 方法嵌入到具有相同结果的类中。

编辑: 正如@max 所建议的(或者我相信他所说的),我已将工人类的代码修改为:

class Worker(Process):

    root = logging.getLogger()
    qh = None

    def __init__(self, n, q):
        super(Worker, self).__init__()
        self.n = n
        self.queue = q

        if not self.qh:
            Worker.qh = QueueHandler(self.queue)            
            Worker.root.addHandler(self.qh)
            Worker.root.setLevel(logging.DEBUG)

        self.logger = logging.getLogger("W%i"%self.n)

        print self.root.handlers

    def run(self):
        self.logger.info("Worker %i Starting"%self.n)

        for i in xrange(10):
            self.logger.log(INFO, "testing %i"%i)

        self.logger.log(INFO, "Completed %i"%self.n)

有了相同的结果,现在队列处理程序没有一次又一次地添加,但仍然有重复的日志条目,即使只有一个工作人员。

编辑2: 我已经稍微更改了代码。我更改了侦听器进程,现在使用 QueueListener(这就是我一开始的意图),将主代码移动到一个类。

import sys

import logging
from logging import INFO
from multiprocessing import Process, Queue as mpQueue
import threading
import time

from logutils.queue import QueueListener, QueueHandler

root = logging.getLogger()
added_qh = False

class Worker(Process):

    def __init__(self, logconf, n, qh):
        super(Worker, self).__init__()
        self.n = n
        self.logconf = logconf

#        global root
        global added_qh

        if not added_qh:
            added_qh = True
            root.addHandler(qh)
            root.setLevel(logging.DEBUG)            

        self.logger = logging.getLogger("W%i"%self.n)

        #print root.handlers

    def run(self):
        self.logger.info("Worker %i Starting"%self.n)

        for i in xrange(10):
            self.logger.log(INFO, "testing %i"%i)

        self.logger.log(INFO, "Completed %i"%self.n)


class Main(object):

    def __init__(self):
        pass

    def start(self):

        mpq = mpQueue(-1)
        qh = QueueHandler(mpq)

        h = logging.StreamHandler()

        ql = QueueListener(mpq, h)

        #h.setFormatter(f)
        root.addHandler(qh)

        l = logging.getLogger("Test")
        l.setLevel(logging.DEBUG)

        workers=[]

        for i in xrange(15):
            worker = Worker(logconf, i, qh)
            worker.daemon = True
            worker.start()
            workers.append(worker)

        for worker in workers:
            print "joining worker: {}".format(worker)
            worker.join()

        mpq.put_nowait(None)

        ql.start()

        # listener.join()

        for i in xrange(10):
            l.info("testing %i"%i)

if __name__ == "__main__":


    x = Main()
    x.start()

    time.sleep(10)

    print "Finish"

现在它大部分工作,直到我达到一定数量的工人(~15),当由于某种原因 Main 类被阻止在 de join 中,其余的工人什么也不做。

【问题讨论】:

  • 你使用的是 windows 还是 *nix?
  • 我正在使用 python 2.7 在 Linux 中运行此代码。

标签: python logging multiprocessing


【解决方案1】:

我想出了一个使用猴子补丁的非常简单的解决方法。它可能不够健壮,而且我不是日志模块的专家,但它似乎是适合我情况的最佳解决方案。在尝试了一些代码更改(以启用从multiprocess.get_logger() 传入的现有记录器)后,我不喜欢代码更改了多少并想出了一个快速(如果我在第一名)易于阅读的黑客/解决方法:

(工作示例,带有多处理池)

import logging
import multiprocessing

class FakeLogger(object):
    def __init__(self, q):
        self.q = q
    def info(self, item):
        self.q.put('INFO - {}'.format(item))
    def debug(self, item):
        self.q.put('DEBUG - {}'.format(item))
    def critical(self, item):
        self.q.put('CRITICAL - {}'.format(item))
    def warning(self, item):
        self.q.put('WARNING - {}'.format(item))

def some_other_func_that_gets_logger_and_logs(num):
    # notice the name get's discarded
    # of course you can easily add this to your FakeLogger class
    local_logger = logging.getLogger('local')
    local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
    local_logger.debug('hmm, something may need debugging here')
    return num*2

def func_to_parallelize(data_chunk):
    # unpack our args
    the_num, logger_q = data_chunk
    # since we're now in a new process, let's monkeypatch the logging module
    logging.getLogger = lambda name=None: FakeLogger(logger_q)
    # now do the actual work that happens to log stuff too
    new_num = some_other_func_that_gets_logger_and_logs(the_num)
    return (the_num, new_num)

if __name__ == '__main__':
    multiprocessing.freeze_support()
    m = multiprocessing.Manager()
    logger_q = m.Queue()
    # we have to pass our data to be parallel-processed
    # we also need to pass the Queue object so we can retrieve the logs
    parallelable_data = [(1, logger_q), (2, logger_q)]
    # set up a pool of processes so we can take advantage of multiple CPU cores
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
    worker_output = pool.map(func_to_parallelize, parallelable_data)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    # get the contents of our FakeLogger object
    while not logger_q.empty():
        print logger_q.get()
    print 'worker output contained: {}'.format(worker_output)

当然,这可能不会涵盖logging 使用的全部范围,但我认为这里的概念很简单,可以快速且相对轻松地开始工作。而且它应该很容易修改(例如 lambda func 丢弃了可以传递给getLogger 的可能前缀)。

【讨论】:

    【解决方案2】:

    我来晚了,所以你可能不再需要答案了。问题在于您已经在主进程中设置了一个处理程序,而在您的工作人员中您正在添加另一个处理程序。这意味着在您的工作进程中,实际上有两个处理程序在管理您的数据,一个将日志推送到队列,一个写入流。

    您只需在代码中添加一行self.root.handlers = [] 即可解决此问题。从您的原始代码来看,worker 的 __init__ 方法如下所示:

    def __init__(self, n, q):
        super(Worker, self).__init__()
        self.n = n
        self.queue = q
    
        self.qh = QueueHandler(self.queue)
        self.root = logging.getLogger()
        self.root.handlers = []
        self.root.addHandler(self.qh)
        self.root.setLevel(logging.DEBUG)
        self.logger = logging.getLogger("W%i"%self.n)
    

    现在的输出如下所示:

    python workers.py 
    2016-05-12 10:07:02,971 Worker-2   W0 INFO         Worker 0 Starting
    2016-05-12 10:07:02,972 Worker-2   W0 INFO         testing 0
    2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 1
    2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 2
    2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 3
    2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 4
    2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 5
    2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 6
    2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 7
    2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 8
    2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 9
    2016-05-12 10:07:02,973 Worker-2   W0 INFO         Completed 0
    Finish
    

    【讨论】:

    • 迟到但非常中肯 :) 谢谢 :)
    【解决方案3】:

    您所有的Workers 共享同一个根记录器对象(在Worker.__init__ 中获得——getLogger 调用总是返回相同的记录器)。但是,每次创建 Worker 时,都会向该记录器添加一个处理程序 (QueueHandler)。

    因此,如果您创建 10 个 Worker,您的根记录器上将有 10 个(相同的)处理程序,这意味着输出会重复 10 次。

    相反,您应该将记录器设为模块属性而不是实例属性,并在模块级别配置一次——而不是在类级别。

    (实际上,记录器应该在程序级别配置一次)

    【讨论】:

    • 能给个简单的sn-p吗?当你说“模块属性”时我真的不明白
    • 差不多了...现在您的根记录器是一个类属性。同样,将其移至模块(顶层)范围——它与类无关。此外,您的 listener_process 函数可能也有一些重复。我不知道您的程序应该做什么,但我感觉您正试图在其中塞入太多东西,尤其是就日志记录而言...当您将每个日志记录调用替换为打印?你能理解为什么吗?
    • 我并不想做任何有用的事情,只是想做一个多处理场景的例子,其中独立的工作人员登录到同一个处理程序。例如:一个大师班产生 10 个工人,他们登录到一个文件中。
    • 好吧,这让我快疯了。我已将 root = logging.getLogger() 移至顶部;我尝试使用 QueueListener 而不是侦听器进程;总而言之,结果相同。我们很乐意接受任何建议。
    • 我要...任何 python 极客来救援?
    猜你喜欢
    • 2020-01-14
    • 1970-01-01
    • 1970-01-01
    • 2021-09-13
    • 2013-02-12
    • 1970-01-01
    • 2022-01-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多