您的问题隐藏了“非常相似”如何定义的潜意识假设。
日志记录可以是 const-only(其实例严格相同),也可以是 const 和变量的混合(完全没有 const 也被视为混合)。
仅 const 日志记录的聚合器是小菜一碟。您只需要决定进程/线程是否会分叉您的聚合。
对于同时包含常量和变量的日志记录,您需要根据记录中的变量决定是否拆分聚合。
字典样式的计数器(来自集合导入计数器)可以用作缓存,它将在 O(1) 中计算您的实例,但如果您愿意,您可能需要一些更高级别的结构来写下变量.此外,您必须手动处理将缓存写入文件 - 每 X 秒(分箱)或一旦程序退出(有风险 - 如果出现问题,您可能会丢失所有内存中的数据)。
聚合框架看起来像这样(在 Python v3.4 上测试):
from logging import Handler
from threading import RLock, Timer
from collections import defaultdict
class LogAggregatorHandler(Handler):
_default_flush_timer = 300 # Number of seconds between flushes
_default_separator = "\t" # Seperator char between metadata strings
_default_metadata = ["filename", "name", "funcName", "lineno", "levelname"] # metadata defining unique log records
class LogAggregatorCache(object):
""" Keeps whatever is interesting in log records aggregation. """
def __init__(self, record=None):
self.message = None
self.counter = 0
self.timestamp = list()
self.args = list()
if record is not None:
self.cache(record)
def cache(self, record):
if self.message is None: # Only the first message is kept
self.message = record.msg
assert self.message == record.msg, "Non-matching log record" # note: will not work with string formatting for log records; e.g. "blah {}".format(i)
self.timestamp.append(record.created)
self.args.append(record.args)
self.counter += 1
def __str__(self):
""" The string of this object is used as the default output of log records aggregation. For example: record message with occurrences. """
return self.message + "\t (occurred {} times)".format(self.counter)
def __init__(self, flush_timer=None, separator=None, add_process_thread=False):
"""
Log record metadata will be concatenated to a unique string, separated by self._separator.
Process and thread IDs will be added to the metadata if set to True; otherwise log records across processes/threads will be aggregated together.
:param separator: str
:param add_process_thread: bool
"""
super().__init__()
self._flush_timer = flush_timer or self._default_flush_timer
self._cache = self.cache_factory()
self._separator = separator or self._default_separator
self._metadata = self._default_metadata
if add_process_thread is True:
self._metadata += ["process", "thread"]
self._aggregation_lock = RLock()
self._store_aggregation_timer = self.flush_timer_factory()
self._store_aggregation_timer.start()
# Demo logger which outputs aggregations through a StreamHandler:
self.agg_log = logging.getLogger("aggregation_logger")
self.agg_log.addHandler(logging.StreamHandler())
self.agg_log.setLevel(logging.DEBUG)
self.agg_log.propagate = False
def cache_factory(self):
""" Returns an instance of a new caching object. """
return defaultdict(self.LogAggregatorCache)
def flush_timer_factory(self):
""" Returns a threading.Timer daemon object which flushes the Handler aggregations. """
timer = Timer(self._flush_timer, self.flush)
timer.daemon = True
return timer
def find_unique(self, record):
""" Extracts a unique metadata string from log records. """
metadata = ""
for single_metadata in self._metadata:
value = getattr(record, single_metadata, "missing " + str(single_metadata))
metadata += str(value) + self._separator
return metadata[:-len(self._separator)]
def emit(self, record):
try:
with self._aggregation_lock:
metadata = self.find_unique(record)
self._cache[metadata].cache(record)
except Exception:
self.handleError(record)
def flush(self):
self.store_aggregation()
def store_aggregation(self):
""" Write the aggregation data to file. """
self._store_aggregation_timer.cancel()
del self._store_aggregation_timer
with self._aggregation_lock:
temp_aggregation = self._cache
self._cache = self.cache_factory()
# ---> handle temp_aggregation and write to file <--- #
for key, value in sorted(temp_aggregation.items()):
self.agg_log.info("{}\t{}".format(key, value))
# ---> re-create the store_aggregation Timer object <--- #
self._store_aggregation_timer = self.flush_timer_factory()
self._store_aggregation_timer.start()
在 for 循环中使用随机日志严重性测试这个 Handler 类:
if __name__ == "__main__":
import random
import logging
logger = logging.getLogger()
handler = LogAggregatorHandler()
logger.addHandler(handler)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
logger.info("entering logging loop")
for i in range(25):
# Randomly choose log severity:
severity = random.choice([logging.DEBUG, logging.INFO, logging.WARN, logging.ERROR, logging.CRITICAL])
logger.log(severity, "test message number %s", i)
logger.info("end of test code")
如果您想添加更多内容,Python 日志记录如下所示:
{'args': ['()'],
'created': ['1413747902.18'],
'exc_info': ['None'],
'exc_text': ['None'],
'filename': ['push_socket_log.py'],
'funcName': ['<module>'],
'levelname': ['DEBUG'],
'levelno': ['10'],
'lineno': ['17'],
'module': ['push_socket_log'],
'msecs': ['181.387901306'],
'msg': ['Test message.'],
'name': ['__main__'],
'pathname': ['./push_socket_log.py'],
'process': ['65486'],
'processName': ['MainProcess'],
'relativeCreated': ['12.6709938049'],
'thread': ['140735262810896'],
'threadName': ['MainThread']}
还有一点需要考虑:
您运行的大多数功能都依赖于几个连续命令的流(理想情况下,它们会相应地报告日志记录);例如客户端-服务器通信通常取决于接收请求、处理请求、从数据库读取一些数据(这需要连接和一些读取命令)、某种解析/处理、构造响应数据包和报告响应代码。
这凸显了使用聚合方法的主要缺点之一:通过聚合日志记录,您会失去对所发生操作的时间和顺序的跟踪。如果您手头只有聚合,那么很难弄清楚哪个请求的结构不正确。
在这种情况下,我的建议是保留原始数据和聚合(使用两个文件处理程序或类似的东西),以便您可以调查宏观级别(聚合)和微观级别(正常日志记录)。
但是,您仍然有责任找出问题所在,然后手动查找导致问题的原因。在您的 PC 上开发时,这是一项非常简单的任务;但是在多个生产服务器中部署您的代码会使这些任务变得繁琐,浪费您的大量时间。
因此,有几家公司专门为日志管理开发产品。大多数将相似的日志记录聚合在一起,但其他一些则结合了机器学习算法,用于自动聚合和学习软件的行为。外包您的日志处理可以让您专注于您的产品,而不是您的错误。
免责声明:我为 Coralogix 工作,就是这样一种解决方案。