这个多进程切片安全的python按时间切割文件。
官方的 TimedRotatingFileHandler 在多进程下疯狂报错,
不信的话可以试试官方 TimedRotatingFileHandler 多进程写入文件日志,设置成每秒换一个新的文件写(主要是按天来切割要耽误很长的时间才能观察错误)




此日志handler采用批量聚合每隔1秒写入,在超高速写入时候,写入速度远超官方。

import os
import time
from pathlib import Path
import queue
import re
import atexit
from threading import Lock, Thread
from nb_filelock import FileLock
import logging
# noinspection PyUnresolvedReferences
from logging import LogRecord, FileHandler


# noinspection PyPep8Naming
class ConcurrentDayRotatingFileHandler(logging.Handler):
    """
    这个多进程切片安全的。
    官方的 TimedRotatingFileHandler 在多进程下疯狂报错,
    不信的话可以试试官方 TimedRotatingFileHandler 多进程写入文件日志,设置成每秒换一个新的文件写(主要是按天来切割要耽误很长的时间才能观察错误)
    """
    file_handler_list = []
    has_start_emit_all_file_handler_process_id_set = set()  # 这个linux和windwos都兼容,windwos是多进程每个进程的变量has_start_emit_all_file_handler是独立的。linux是共享的。
    __lock_for_rotate = Lock()

    @classmethod
    def _emit_all_file_handler(cls):
        while True:
            for hr in cls.file_handler_list:
                # very_nb_print(hr.buffer_msgs_queue.qsize())
                # noinspection PyProtectedMember
                hr._write_to_file()
            time.sleep(1)  # 每隔一秒钟批量写入一次,性能好了很多。

    @classmethod
    def _start_emit_all_file_handler(cls):
        pass
        Thread(target=cls._emit_all_file_handler, daemon=True).start()

    # noinspection PyMissingConstructor
    def __init__(self, file_name: str, file_path: str, back_count=10):
        super().__init__()
        self.file_name = file_name
        self.file_path = file_path
        self.backupCount = back_count
        self.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}(\.\w+)?$", re.ASCII)
        self.extMatch2 = re.compile(r"^\d{2}-\d{2}-\d{2}(\.\w+)?$", re.ASCII)

        self.buffer_msgs_queue = queue.Queue()
        atexit.register(self._write_to_file)  # 如果程序属于立马就能结束的,需要在程序结束前执行这个钩子,防止不到最后一秒的日志没记录到。
        self.file_handler_list.append(self)
        if os.getpid() not in self.has_start_emit_all_file_handler_process_id_set:
            self._start_emit_all_file_handler()
            self.__class__.has_start_emit_all_file_handler_process_id_set.add(os.getpid())

    def emit(self, record: LogRecord):
        """
        emit已经在logger的handle方法中加了锁,所以这里的重置上次写入时间和清除buffer_msgs不需要加锁了。
        :param record:
        :return:
        """
        # noinspection PyBroadException
        try:
            msg = self.format(record)
            self.buffer_msgs_queue.put(msg)
        except Exception:
            self.handleError(record)

    def _write_to_file(self):
        buffer_msgs = ''
        while True:
            try:
                msg = self.buffer_msgs_queue.get(block=False)
                buffer_msgs += msg + '\n'
            except queue.Empty:
                break
        if buffer_msgs:
            time_str = time.strftime('%Y-%m-%d')
            # time_str = time.strftime('%H-%M-%S')  # 方便测试用的,方便观察。
            new_file_name = self.file_name + '.' + time_str
            path_obj = Path(self.file_path) / Path(new_file_name)
            path_obj.touch(exist_ok=True)
            with path_obj.open(mode='a') as f:
                f.write(buffer_msgs)
            with FileLock(self.file_path / Path(f'_delete_{self.file_name}.lock')):
                self._find_and_delete_files()

    def _find_and_delete_files(self):
        """
        这一段命名不规范是复制原来的官方旧代码。
        Determine the files to delete when rolling over.

        More specific than the earlier method, which just used glob.glob().
        """
        dirName = self.file_path
        baseName = self.file_name
        fileNames = os.listdir(dirName)
        result = []
        prefix = baseName + "."
        plen = len(prefix)
        for fileName in fileNames:
            if fileName[:plen] == prefix:
                suffix = fileName[plen:]
                # print(fileName, prefix,suffix)
                if self.extMatch.match(suffix) or self.extMatch2.match(suffix):
                    result.append(os.path.join(dirName, fileName))
        if len(result) < self.backupCount:
            result = []
        else:
            result.sort()
            result = result[:len(result) - self.backupCount]
        # print(result)
        for r in result:
            Path(r).unlink()


from concurrent.futures import ProcessPoolExecutor
from logging.handlers import TimedRotatingFileHandler

logger = logging.getLogger('lala')
file_handler = ConcurrentDayRotatingFileHandler('test_my_cd.log', file_path='/pythonlogs')
# file_handler = FileHandler('/pythonlogs/test_fhh.log')  # 对比官方测试性能
# file_handler = TimedRotatingFileHandler('/pythonlogs/test_fhh.log') # 对比官方测试性能
file_handler.setFormatter(logging.Formatter('[%(asctime)s] - %(filename)s] - %(levelname)s: %(message)s'))
logger.addHandler(file_handler)

pool = ProcessPoolExecutor(10)


def fun(x):
    for i in range(100000):
        # time.sleep(0.2)
        logger.warning(f"{x} {i}")


if __name__ == '__main__':
    print('开始', time.strftime('%H_%M_%S'))
    for j in range(10):
        pool.submit(fun, j)
    pool.shutdown()
    print('结束', time.strftime('%H_%M_%S'))

 

相关文章:

  • 2021-04-27
  • 2022-12-23
  • 2022-01-01
  • 2022-12-23
  • 2022-12-23
  • 2021-12-23
  • 2022-12-23
  • 2022-12-23
猜你喜欢
  • 2022-12-23
  • 2021-11-30
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-04-01
相关资源
相似解决方案