【问题标题】:Python - Using Streamhandler in multiprocessing environmentPython - 在多处理环境中使用 Streamhandler
【发布时间】:2015-04-28 17:36:17
【问题描述】:

我有一个 CLI 脚本,它将所有进程记录到一个日志文件中。 CLI 的功能之一是通过以下方式上传大文件 将其分成几部分并并行上传。 在 linux 中,整个事情就像一个魅力,但在 windows 我 似乎无法使用Streamhandler流式传输子进程 (_upload_for_multipart) 的日志条目 来自logging 模块。 logger.info 中的语句 _upload_for_multipart 已正确登录到日志文件 (my_logfile.txt) 但是当 verbose 参数 选择 CLI。所有其他语句(在其他函数中)都是流式传输和记录的。 有什么帮助吗? 下面是我面临的问题的完整工作示例。您不需要任何额外的库来运行它。

import argparse, glob, logging, math, os
from timeit import default_timer as timer
from filechunkio.filechunkio import FileChunkIO
from multiprocessing import cpu_count, Pool, freeze_support, current_process
from sys import exit, exc_info, argv, stdout, version_info, stdout, platform
from time import mktime, strptime

logger = None

def _upload_for_multipart(keyname, offset, multipart, part_num,bytes, parts):
    try:
        with FileChunkIO(keyname, 'r', offset=offset, bytes=bytes) as fp:
                try:
                    start = timer()
                    logger.info( 'Uploading part {0}/{1}'.format ( part_num, parts ) )
                    logger.info('Uploading im MP')
                    end = timer()
                except Exception as e:
                    logger.error('Some error occured')
                    exit()
        logger.info( 'UPLOADED part {0}/{1} time = {2:0.1f}s Size: {3}'.format (part_num, parts, (end - start), bytes ) )
    except Exception as e:
        logger.error( 'FAILED uploading {0}.{1}'.format(keyname), e )
        exit(1)

def _upload_part(argFile, argBucket, **core_chunk):
    file_path = argFile
    bucket_name = argBucket
    file_name = os.path.basename( file_path )
    source_size = os.stat( file_path ).st_size

    chunk_size = max(int(math.sqrt(5242880) * math.sqrt(source_size)),
                         5242880)

    chunk_amount = int(math.ceil(source_size / float(chunk_size)))
    #mp = s3_bucket.initiate_multipart_upload( file_name )
    mp = ''
    logger.info('Initiate multipart upload')
    logger.info( 'File size of {0} is {1}. Parallel uploads will be used to speed up the process'\
            .format( file_name, source_size  ) )

    start_time = timer()
    pool = Pool(processes=1, initializer = init_log, initargs = ( logFile, ) )
    for i in range( chunk_amount ):
        offset = i * chunk_size
        remaining_bytes = source_size - offset
        bytes = min( [chunk_size, remaining_bytes] )
        part_num = i + 1
        start = timer()
        pool.apply_async( _upload_for_multipart, [file_name, offset, mp, part_num, bytes, chunk_amount] )
    pool.close()
    pool.join()
    end = timer()
    logger.info('Process complete')

def _get_logger( pdir, ldir, lname, level, fmt ):
    try:
        logs_dir = os.path.join( pdir, ldir )
        if not os.path.exists( logs_dir ):
            os.makedirs( logs_dir )
    except Exception as e:
        print ('{}'.format(e))
        exit(1)

    logging.basicConfig(
            filename=os.path.join(logs_dir, lname),
            level=level,
            format=fmt
    )
    return logging.getLogger( lname )

def init_log(logFile):
    global logger
    exec_file = os.path.abspath( argv[0] )
    exec_dir = os.path.dirname( exec_file )
    default_logger = dict( pdir=exec_dir, ldir='logs', lname='error.log', level='ERROR',
                           fmt='%(asctime)s %(levelname)s: %(message)s' )

    log_filename = logFile
    level = 'INFO'
    format = '%(asctime)s %(levelname)s: %(message)s'

    default_logger.update( fmt=format, level=level, lname = log_filename )
    if os.path.isabs( log_filename ):
        bdir, log_filename = os.path.split( log_filename )
        default_logger.update(pdir='', ldir = bdir, lname = log_filename )
    logger = _get_logger( **default_logger )

if __name__ == "__main__":

    freeze_support()

    parser = argparse.ArgumentParser( description="CLI." )
    group = parser.add_mutually_exclusive_group()
    group.add_argument( "-v", "--verbose", action='store_const', const=True, help="Output process messages to stdout \
                        channel")
    args = parser.parse_args()

    logFile = 'mylogfile.txt'
    init_log(logFile)

    bucket_name = 'some-bucket'

    if args.verbose:
        try:
            print_handler = logging.StreamHandler( stdout )
            print_handler.setLevel( logging.DEBUG )
            formatter = logging.Formatter( '%(asctime)s %(levelname)s: %(message)s' )
            print_handler.setFormatter( formatter )
            logger.addHandler( print_handler )
        except (NoOptionError, NoSectionError) as e:
            logger.exception( e )

    logger.info('Establishing Connection')
    _upload_part('large_testfile.log', bucket_name)

【问题讨论】:

    标签: python windows logging multiprocessing


    【解决方案1】:

    StreamHandler 在子进程中不起作用,因为您只是在父进程中设置它。您需要在init_log 中进行所有日志设置,以使其在子级中生效:

    # ... This stuff is the same...    
    
    def _upload_part(argFile, argBucket, verbose, **core_chunk):  # Add verbose argument
        #... Same until you declare the Pool
        pool = Pool(processes=1, initializer=init_log, initargs=(logFile, verbose))  # Add verbose to initargs
       # All the same ...
    
    def init_log(logFile, verbose):
        global logger
        exec_file = os.path.abspath( argv[0] )
        exec_dir = os.path.dirname( exec_file )
        default_logger = dict( pdir=exec_dir, ldir='logs', lname='error.log', level='ERROR',
                               fmt='%(asctime)s %(levelname)s: %(message)s' )
    
        log_filename = logFile
        level = 'INFO'
        format = '%(asctime)s %(levelname)s: %(message)s'
    
        default_logger.update( fmt=format, level=level, lname = log_filename )
        if os.path.isabs( log_filename ):
            bdir, log_filename = os.path.split( log_filename )
            default_logger.update(pdir='', ldir = bdir, lname = log_filename )
        logger = _get_logger( **default_logger )
    
        if verbose:  # Set up StreamHandler here
            try:
                print_handler = logging.StreamHandler(stdout)
                print_handler.setLevel(logging.DEBUG)
                formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
                print_handler.setFormatter(formatter)
                logger.addHandler(print_handler)
            except (NoOptionError, NoSectionError) as e:
                logger.exception(e)
    
    if __name__ == "__main__":
    
        freeze_support()
    
        parser = argparse.ArgumentParser( description="CLI." )
        group = parser.add_mutually_exclusive_group()
        group.add_argument( "-v", "--verbose", action='store_const', const=True, help="Output process messages to stdout \
                            channel")
        args = parser.parse_args()
    
        logFile = 'mylogfile.txt'
        init_log(logFile)
    
        bucket_name = 'some-bucket'
    
        logger.info('Establishing Connection')
        _upload_part('large_testfile.log', bucket_name, args.verbose)  # Pass args.verbose
    

    【讨论】:

    • 你先生,真的是多处理之神。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-04-07
    • 1970-01-01
    相关资源
    最近更新 更多