【问题标题】:How do I log multiple very similar events gracefully in python?如何在 python 中优雅地记录多个非常相似的事件?
【发布时间】:2016-03-09 13:01:43
【问题描述】:

使用 pythons logging 模块,有没有办法将多个事件收集到一个日志条目中?一个理想的解决方案是扩展 python 的 logging 模块或 自定义格式化程序/过滤器,以便在后台收集相同类型的日志事件,不需要添加任何内容代码主体(例如,在每次调用日志记录函数时)。

这里是一个示例,它生成大量相同或非常相似的日志记录事件:

import logging

for i in range(99999): 
    try:
        asdf[i]   # not defined!
    except NameError:
        logging.exception('foo') # generates large number of logging events
    else: pass

# ... more code with more logging ...

for i in range(88888): logging.info('more of the same %d' % i)

# ... and so on ...

所以我们有 99999 次相同的异常并记录它。如果日志只是说这样的话,那就太好了:

ERROR:root:foo (occured 99999 times)
Traceback (most recent call last):
  File "./exceptionlogging.py", line 10, in <module>
    asdf[i]   # not defined!
NameError: name 'asdf' is not defined

INFO:root:foo more of the same (occured 88888 times with various values)

【问题讨论】:

  • 我猜你需要添加一些钩子(可能是自定义 logging.Formatter 子类)来捕获所有记录以推迟它们并合并下一个接收到的记录(如果它“看起来”相同)。跨度>
  • 没错,我想知道是否有人已经写过这样的东西了。
  • 那么,你试过了吗?到目前为止它看起来像什么?我自己没有尝试过这样的事情,但是我做了一个 Formatter 子类来在日志上添加另一个计时器(自程序启动以来的时间)
  • 我没有,目前我手头的时间很少。我只是想,它可能对很多人有用,所以我悬赏它。
  • 我实际上认为这是有害的。您如何知道在执行过程中每个 99999 异常发生的时间?另外,您会将第一个写入日志文件并每次都更新计数器吗?你会写最后一个吗?但是如果没有最后一个呢?如果我们写入标准输出而不是文件怎么办?这里有太多未解决的问题...

标签: python exception logging


【解决方案1】:

您可能应该编写一个消息聚合/统计类,而不是尝试连接到日志系统的singletons,但我猜您可能有一个使用日志的现有代码库。

我还建议您实例化您的记录器,而不是始终使用默认根。 Python Logging Cookbook 有广泛的解释和示例。

以下课程应该按照您的要求进行。

import logging
import atexit
import pprint

class Aggregator(object):
    logs = {}

    @classmethod
    def _aggregate(cls, record):
        id = '{0[levelname]}:{0[name]}:{0[msg]}'.format(record.__dict__)
        if id not in cls.logs: # first occurrence
            cls.logs[id] = [1, record]
        else: # subsequent occurrence
            cls.logs[id][0] += 1

    @classmethod
    def _output(cls):
        for count, record in cls.logs.values():
            record.__dict__['msg'] += ' (occured {} times)'.format(count)
            logging.getLogger(record.__dict__['name']).handle(record)

    @staticmethod
    def filter(record):
        # pprint.pprint(record)
        Aggregator._aggregate(record)
        return False

    @staticmethod
    def exit():
        Aggregator._output()



logging.getLogger().addFilter(Aggregator)
atexit.register(Aggregator.exit)

for i in range(99999): 
    try:
        asdf[i]   # not defined!
    except NameError:
        logging.exception('foo') # generates large number of logging events
    else: pass

# ... more code with more logging ...
for i in range(88888): logging.error('more of the same')

# ... and so on ...    

请注意,在程序退出之前您不会收到任何日志。

运行结果如下:

ERROR:root:foo (发生 99999 次) 回溯(最近一次通话最后): 文件“C:\work\VEMS\python\logcount.py”,第 38 行,在 asdf[i] # 未定义! NameError:名称“asdf”未定义 ERROR:root:more of the same (出现 88888 次)

【讨论】:

  • 我非常喜欢这个解决方案,迈克,谢谢。唯一的杀手是“在程序退出之前你不会得到任何日志”。对于我当前的用例,这很好,但一般的解决方案应该立即记录第一条消息。
  • 非常好。根据需要,应该有一种等待出口的方法,例如在每个 X 条日志之后刷新,或仅在重复相同日志时聚合(在日志消息更改时刷新),或聚合 DEBUG 日志,但在高于此值的任何内容上刷新,或以上所有。
  • @con-f-use,我看到您已将“具有各种值”添加到日志匹配中。可以使用 difflib.SequenceMatcher 之类的东西扩展聚合器的 id 匹配,并忽略数字作为垃圾。
  • @MikeRobins 这个实现不是线程安全的,当发生冲突时可能会导致计数器归零。我建议将 threading.RLock 添加到任何更改 cls.logs 字典数据的操作中。
  • 我尝试使用 difflib 匹配仅在数字上有所不同的日志。似乎 SequenceMatcher 不会为仅在所谓的垃圾中有所不同的小字符串生成紧密匹配。例如 difflib.SequenceMatcher(lambda x: x in ".0123456789", log1, log2).ratio()。也许做一些更hacky的事情,比如用“”替换数字然后比较。
【解决方案2】:

您的问题隐藏了“非常相似”如何定义的潜意识假设。 日志记录可以是 const-only(其实例严格相同),也可以是 const 和变量的混合(完全没有 const 也被视为混合)。

仅 const 日志记录的聚合器是小菜一碟。您只需要决定进程/线程是否会分叉您的聚合。 对于同时包含常量和变量的日志记录,您需要根据记录中的变量决定是否拆分聚合。

字典样式的计数器(来自集合导入计数器)可以用作缓存,它将在 O(1) 中计算您的实例,但如果您愿意,您可能需要一些更高级别的结构来写下变量.此外,您必须手动处理将缓存写入文件 - 每 X 秒(分箱)或一旦程序退出(有风险 - 如果出现问题,您可能会丢失所有内存中的数据)。

聚合框架看起来像这样(在 Python v3.4 上测试):

from logging import Handler
from threading import RLock, Timer
from collections import defaultdict


class LogAggregatorHandler(Handler):

    _default_flush_timer = 300  # Number of seconds between flushes
    _default_separator = "\t"  # Seperator char between metadata strings
    _default_metadata = ["filename", "name", "funcName", "lineno", "levelname"]  # metadata defining unique log records

    class LogAggregatorCache(object):
        """ Keeps whatever is interesting in log records aggregation. """
        def __init__(self, record=None):
            self.message = None
            self.counter = 0
            self.timestamp = list()
            self.args = list()
            if record is not None:
                self.cache(record)

        def cache(self, record):
            if self.message is None:  # Only the first message is kept
                self.message = record.msg
            assert self.message == record.msg, "Non-matching log record"  # note: will not work with string formatting for log records; e.g. "blah {}".format(i)
            self.timestamp.append(record.created)
            self.args.append(record.args)
            self.counter += 1

        def __str__(self):
            """ The string of this object is used as the default output of log records aggregation. For example: record message with occurrences. """
            return self.message + "\t (occurred {} times)".format(self.counter)

    def __init__(self, flush_timer=None, separator=None, add_process_thread=False):
        """
        Log record metadata will be concatenated to a unique string, separated by self._separator.
        Process and thread IDs will be added to the metadata if set to True; otherwise log records across processes/threads will be aggregated together.
        :param separator: str
        :param add_process_thread: bool
        """
        super().__init__()
        self._flush_timer = flush_timer or self._default_flush_timer
        self._cache = self.cache_factory()
        self._separator = separator or self._default_separator
        self._metadata = self._default_metadata
        if add_process_thread is True:
            self._metadata += ["process", "thread"]
        self._aggregation_lock = RLock()
        self._store_aggregation_timer = self.flush_timer_factory()
        self._store_aggregation_timer.start()

        # Demo logger which outputs aggregations through a StreamHandler:
        self.agg_log = logging.getLogger("aggregation_logger")
        self.agg_log.addHandler(logging.StreamHandler())
        self.agg_log.setLevel(logging.DEBUG)
        self.agg_log.propagate = False

    def cache_factory(self):
        """ Returns an instance of a new caching object. """
        return defaultdict(self.LogAggregatorCache)

    def flush_timer_factory(self):
        """ Returns a threading.Timer daemon object which flushes the Handler aggregations. """
        timer = Timer(self._flush_timer, self.flush)
        timer.daemon = True
        return timer

    def find_unique(self, record):
        """ Extracts a unique metadata string from log records. """
        metadata = ""
        for single_metadata in self._metadata:
            value = getattr(record, single_metadata, "missing " + str(single_metadata))
            metadata += str(value) + self._separator
        return metadata[:-len(self._separator)]

    def emit(self, record):
        try:
            with self._aggregation_lock:
                metadata = self.find_unique(record)
                self._cache[metadata].cache(record)
        except Exception:
            self.handleError(record)

    def flush(self):
        self.store_aggregation()

    def store_aggregation(self):
        """ Write the aggregation data to file. """
        self._store_aggregation_timer.cancel()
        del self._store_aggregation_timer
        with self._aggregation_lock:
            temp_aggregation = self._cache
            self._cache = self.cache_factory()

        # ---> handle temp_aggregation and write to file <--- #
        for key, value in sorted(temp_aggregation.items()):
            self.agg_log.info("{}\t{}".format(key, value))

        # ---> re-create the store_aggregation Timer object <--- #
        self._store_aggregation_timer = self.flush_timer_factory()
        self._store_aggregation_timer.start()

在 for 循环中使用随机日志严重性测试这个 Handler 类:

if __name__ == "__main__":
    import random
    import logging

    logger = logging.getLogger()
    handler = LogAggregatorHandler()
    logger.addHandler(handler)
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.DEBUG)

    logger.info("entering logging loop")

    for i in range(25):
        # Randomly choose log severity:
        severity = random.choice([logging.DEBUG, logging.INFO, logging.WARN, logging.ERROR, logging.CRITICAL])
        logger.log(severity, "test message number %s", i)

    logger.info("end of test code")

如果您想添加更多内容,Python 日志记录如下所示:

{'args': ['()'],
 'created': ['1413747902.18'],
 'exc_info': ['None'],
 'exc_text': ['None'],
 'filename': ['push_socket_log.py'],
 'funcName': ['<module>'],
 'levelname': ['DEBUG'],
 'levelno': ['10'],
 'lineno': ['17'],
 'module': ['push_socket_log'],
 'msecs': ['181.387901306'],
 'msg': ['Test message.'],
 'name': ['__main__'],
 'pathname': ['./push_socket_log.py'],
 'process': ['65486'],
 'processName': ['MainProcess'],
 'relativeCreated': ['12.6709938049'],
 'thread': ['140735262810896'],
 'threadName': ['MainThread']}

还有一点需要考虑: 您运行的大多数功能都依赖于几个连续命令的流(理想情况下,它们会相应地报告日志记录);例如客户端-服务器通信通常取决于接收请求、处理请求、从数据库读取一些数据(这需要连接和一些读取命令)、某种解析/处理、构造响应数据包和报告响应代码。

这凸显了使用聚合方法的主要缺点之一:通过聚合日志记录,您会失去对所发生操作的时间和顺序的跟踪。如果您手头只有聚合,那么很难弄清楚哪个请求的结构不正确。 在这种情况下,我的建议是保留原始数据和聚合(使用两个文件处理程序或类似的东西),以便您可以调查宏观级别(聚合)和微观级别(正常日志记录)。

但是,您仍然有责任找出问题所在,然后手动查找导致问题的原因。在您的 PC 上开发时,这是一项非常简单的任务;但是在多个生产服务器中部署您的代码会使这些任务变得繁琐,浪费您的大量时间。 因此,有几家公司专门为日志管理开发产品。大多数将相似的日志记录聚合在一起,但其他一些则结合了机器学习算法,用于自动聚合和学习软件的行为。外包您的日志处理可以让您专注于您的产品,而不是您的错误。

免责声明:我为 Coralogix 工作,就是这样一种解决方案。

【讨论】:

  • 感谢您分享您的见解。至于“非常相似”是如何定义的>>潜意识假设
  • @con-f-use 我已经测试了代码,为 Python v3.4 修复了它并添加了一些小功能。即可配置flush_timer,flush()方法避免退出时数据丢失,在屏幕上通过logging.StreamHandler打印聚合数据。
【解决方案3】:

您可以继承 logger 类并覆盖异常方法以将您的错误类型放入缓存中,直到它们到达某个计数器后再发送到日志。

import logging
from collections import defaultdict

MAX_COUNT = 99999


class MyLogger(logging.getLoggerClass()):
    def __init__(self, name):
        super(MyLogger, self).__init__(name)
        self.cache = defaultdict(int)

    def exception(self, msg, *args, **kwargs):
        err = msg.__class__.__name__
        self.cache[err] += 1
        if self.cache[err] > MAX_COUNT:
            new_msg = "{err} occurred {count} times.\n{msg}"
            new_msg = new_msg.format(err=err, count=MAX_COUNT, msg=msg)
            self.log(logging.ERROR, new_msg, *args, **kwargs)
            self.cache[err] = None


log = MyLogger('main')
try:
    raise TypeError("Useful error message")
except TypeError as err:
    log.exception(err)

请注意,这不是复制粘贴代码。
您需要自己添加处理程序(我也推荐格式化程序)。
https://docs.python.org/2/howto/logging.html#handlers

玩得开心。

【讨论】:

    【解决方案4】:

    创建一个计数器并仅记录 count=1,然后递增并在 finally 块中写入(以确保无论应用程序崩溃和烧毁多么严重,它都会被记录)。如果您出于不同的原因遇到相同的异常,这当然可能会造成问题,但您始终可以搜索行号以验证它是相同的问题或类似的问题。一个最小的例子:

    name_error_exception_count = 0
    try:
        for i in range(99999): 
            try:
                asdf[i]   # not defined!
            except NameError:
                name_error_exception_count += 1
                if name_error_exception_count == 1:
                    logging.exception('foo') 
            else: pass
    except Exception:
        pass  # this is just to get the finally block, handle exceptions here too, maybe
    finally:
        if name_error_exception_count > 0:
            logging.exception('NameError exception occurred {} times.'.format(name_error_exception_count))
    

    【讨论】:

    • 这将是最简单的解决方案,如果我在源代码的一个位置只存在这种类型的多重异常,那将是最好的解决方案。具有logging 模块的自定义过滤器/格式化程序的解决方案会更好。
    • @con-f-use 那么您需要修改您的示例以考虑更真实的代码表示,以便答案可以解决您实际需要的内容。例如,处理日志记录和跟踪多次出现的类或函数装饰器可能更合适,但您的示例既没有显示函数也没有显示带有方法的类。
    • @con-f-use 尽管已编辑帖子,但它确实没有进一步深入了解问题。您当然可以编写自己的记录器,该记录器使用此答案的原则来避免记录来自相同函数/方法的重复异常,但您的示例甚至没有说明您的代码是否从函数或类方法内部运行,这可能会影响某人如何编写记录器或替代解决方案,例如记录装饰器、上下文管理器或其他选项。
    • @con-f-use 是的,但它如何解析调用函数是另一回事。如果它在一个有子类调用它的类中,是否应该出现异常propogate?如果是这样,它如何在不知道调用类的情况下知道它是否真的是重复的?如何解决这个问题(例如通过inspect)取决于你打算如何使用它。上下文管理器最适合所有用途,但如果您甚至没有类方法或函数,您可能不需要那么复杂的东西。
    • 鉴于您的问题中显示的复杂性,我的回答效果很好。 Bad MCV,不好的答案。
    猜你喜欢
    • 2019-05-17
    • 1970-01-01
    • 1970-01-01
    • 2015-03-07
    • 1970-01-01
    • 1970-01-01
    • 2012-06-03
    • 2010-12-08
    相关资源
    最近更新 更多