【发布时间】:2019-06-08 17:33:16
【问题描述】:
我有一个我一直在从事的 python3 (3.4.5) 项目,它利用 multiprocessing.Pool 通过 4 名工人运行大约 50 多个工作。我有一个带有logging.handlers.QueueListener 的单独进程设置,因此我可以通过与multiprocessing.Manager() 一起使用的Queue 将全局内容记录到单个文件中。所以基本上流程是这样的
- 主程序启动
- 通过
multiprocessing.Manager()创建Queue - 使用
QueueListener启动专用日志记录进程,监听我刚刚为全局日志创建的Queue。 (我也试过这个,只是使用主程序的一个线程,结果相同。) - 创建一个
multiprocessing.Pool来处理单个作业,将之前创建的Queue和必要的配置信息传递给它们以运行和设置它们的日志记录(有一个全局日志,以及每个作业的单个日志,其中包含更详细的信息)。作业以map_async开始。 - 等待所有作业处理完毕,然后执行一些最后步骤并进行清理。
不过,我在某些作业中不断收到间歇性错误,通常其中 1 个作业有错误(每次都不同),但偶尔也会有 2 个相同的错误或零。据我所知,导致错误的不是作业中的代码,而是multiprocessing 或logging 设置中的某些内容。这是我遇到的错误的一个示例:
--- Logging error ---
Traceback (most recent call last):
File "/usr/lib64/python3.4/logging/handlers.py", line 1347, in emit
self.enqueue(self.prepare(record))
File "/usr/lib64/python3.4/logging/handlers.py", line 1313, in enqueue
self.queue.put_nowait(record)
File "<string>", line 2, in put_nowait
File "/usr/lib64/python3.4/multiprocessing/managers.py", line 731, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/lib64/python3.4/multiprocessing/connection.py", line 206, in send
self._send_bytes(ForkingPickler.dumps(obj))
File "/usr/lib64/python3.4/multiprocessing/connection.py", line 413, in _send_bytes
self._send(chunk)
File "/usr/lib64/python3.4/multiprocessing/connection.py", line 369, in _send
n = write(self._handle, buf)
TypeError: an integer is required (got type NoneType)
Call stack:
File "./sampling__test__py.py", line 100, in <module>
run_pool = multiprocessing.Pool(4)
File "/usr/lib64/python3.4/multiprocessing/context.py", line 118, in Pool
context=self.get_context())
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 168, in __init__
self._repopulate_pool()
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 233, in _repopulate_pool
w.start()
File "/usr/lib64/python3.4/multiprocessing/process.py", line 105, in start
self._popen = self._Popen(self)
File "/usr/lib64/python3.4/multiprocessing/context.py", line 267, in _Popen
return Popen(process_obj)
File "/usr/lib64/python3.4/multiprocessing/popen_fork.py", line 21, in __init__
self._launch(process_obj)
File "/usr/lib64/python3.4/multiprocessing/popen_fork.py", line 77, in _launch
code = process_obj._bootstrap()
File "/usr/lib64/python3.4/multiprocessing/process.py", line 254, in _bootstrap
self.run()
File "/usr/lib64/python3.4/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "/home/username/value_check.py", line 338, in value_check
global_logger.info("SplitTime: {str_timeDelta} -- COMPLETED: {Check_Name} --- Total Txn Count: {var_Total_Txn_Count} --- Criteria Txn Count: {var_Criteria_Txn_Count} --- Threshold: {Threshold} --- Low_Vol Threshold: {LowVolThresh}".format(str_timeDelta = timeDelta(datetime.now() - YAML_Config['start_time']), **YAML_Config))
Message: 'SplitTime: 00:01:05,031 -- COMPLETED: ALPHA_CHECK --- Total Txn Count: 1234--- Criteria Txn Count: 0 --- Threshold: 10 --- Low_Vol Threshold: 0'
Arguments: None
代码中的错误指的是我的代码中的日志记录对象,但即使我在调用周围放置try/except 逻辑,它也没有做任何事情,错误似乎发生在上游。我还尝试将记录的内容从格式化的字符串更改为简单的字符串,但无济于事。似乎在某个地方,个别工作要么失去与Queue 的联系,要么Queue 中的某些东西失败并导致问题。
有什么想法吗?我一直在努力获得一个更新版本的 Python 可用,这将有很多原因(特别是 f 字符串),但我不知道这是否能解决这个问题,而且我已经用完了故障排除思路。
【问题讨论】:
-
是的,这是有道理的,是代码失败了,而不是我的代码中的实际日志调用。有什么想法首先为什么会发生这种异常?
标签: python python-3.x logging multiprocessing