【问题标题】:Python multiprocessing.Queue stalls when a ctype object is passedPython multiprocessing.Queue 在传递 ctype 对象时停止
【发布时间】:2020-12-01 13:46:03
【问题描述】:

以下程序执行以下操作:

  1. 父进程创建数据类型SHARED_DTYPE的进程间共享值
  2. 父进程创建进程间队列以将对象从子进程传递给父进程。
  3. 父进程产生子进程(并通过进程间队列等待对象到达)。
  4. 子进程修改进程间共享值的值
  5. 子进程创建一个数据类型为TRAVELLER_DTYPE的对象
  6. 子进程通过进程间队列传递创建的对象。
  7. 父进程通过进程间队列接收对象。
from multiprocessing import Value, Process, Queue
import ctypes

SHARED_DTYPE = ctypes.c_int
TRAVELLER_DTYPE = ctypes.c_float

shared_value = Value(SHARED_DTYPE, 0)
print('type of shared_value =', type(shared_value))
print('shared_value =', shared_value.value)

def child_proc():
    try:
        shared_value.value = 1
        obj = TRAVELLER_DTYPE(5)
        print('send into queue =', obj)
        q.put(obj)
    except BaseException as e:
        print(e)
    finally:
        print('child_proc process is finished')

if __name__ == "__main__":
    try:
        q = Queue()
        cp = Process(target=child_proc)
        cp.start()
        cp.join()

        print('shared_value =', shared_value.value)
        obj = q.get()
        print('recv from queue =', obj)
    except BaseException as e:
        print(e)
    finally:
        print('__main__ process is finished')

现在,如果上面的程序运行,它可以正常工作,输出如下:

type of shared_value = <class 'multiprocessing.sharedctypes.Synchronized'>
shared_value = 0
send into queue = c_float(5.0)
child_proc process is finished
shared_value = 1
recv from queue = c_float(5.0)
__main__ process is finished

但是如果我们将程序顶部的TRAVELLER_DTYPE 更改为ctypes.c_int,它就不再正常工作了。

有时,它会给出以下输出:

type of shared_value = <class 'multiprocessing.sharedctypes.Synchronized'>
shared_value = 0
send into queue = c_int(5)
child_proc process is finished
shared_value = 1
^C                               <-- Pressed ctrl-C here, was hung indefinitely.
__main__ process is finished

而其他时候,它会给出以下输出:

type of shared_value = <class 'multiprocessing.sharedctypes.Synchronized'>
shared_value = 0
send into queue = c_int(5)
child_proc process is finished
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "/usr/lib/python3.8/multiprocessing/sharedctypes.py", line 129, in reduce_ctype
    assert_spawning(obj)
  File "/usr/lib/python3.8/multiprocessing/context.py", line 359, in assert_spawning
    raise RuntimeError(
RuntimeError: c_int objects should only be shared between processes through inheritance
shared_value = 1
^C                            <-- Pressed ctrl-C here, was hung indefinitely.
__main__ process is finished

为什么?

一般来说,当且仅当SHARED_DTYPE != TRAVELLER_DTYPE

是否需要一些显式锁定对象?


Python 多处理 doc page 没有提到任何此类问题。

在线搜索错误消息并没有提供任何相关信息/线索:

【问题讨论】:

    标签: python multiprocessing queue ctypes


    【解决方案1】:

    奇怪的是,当两种类型不同时它会起作用,但当它们相同时会失败。提到的错误报告看起来相关但陈旧。这似乎是一个错误。一种解决方法是,与 Value 对象不同,Queue 对象不需要(也许不应该)是ctypes 类型,因此您可以改用intfloat 并且它可以工作.

    我假设您在 Linux 上运行,但在 Windows 上它使用进程的生成与分叉,并且在生成时脚本被导入到子进程中,从而使全局变量在进程之间成为不同的实例。这甚至会使您的“工作”场景在 Windows 上失败。相反,队列和共享值应该作为参数传递给子工作者,确保它们作为 same 对象正确继承(这可能是错误消息所指的内容)。

    下面我还重新安排了代码以用于生成,因此它可以在 Windows 和 Linux 上运行:

    from multiprocessing import Value, Process, Queue
    import ctypes
    
    SHARED_DTYPE = ctypes.c_int
    TRAVELLER_DTYPE = int
    
    def child_proc(q,shared_value):
        shared_value.value = 1
        obj = TRAVELLER_DTYPE(5)
        print('send into queue =', obj)
        q.put(obj)
        print('child_proc process is finished')
    
    if __name__ == "__main__":
        shared_value = Value(SHARED_DTYPE, 0)
        print('type of shared_value =', type(shared_value))
        print('shared_value =', shared_value.value)
        q = Queue()
        cp = Process(target=child_proc,args=(q,shared_value))
        cp.start()
        cp.join()
    
        print('shared_value =', shared_value.value)
        obj = q.get()
        print('recv from queue =', obj)
        print('__main__ process is finished')
    
    type of shared_value = <class 'multiprocessing.sharedctypes.Synchronized'>
    shared_value = 0
    send into queue = 5
    child_proc process is finished
    shared_value = 1
    recv from queue = 5
    __main__ process is finished
    

    【讨论】:

    • 嗨@MarkTolonen,感谢您的回答。事实上,我正在运行 Linux,代码不需要支持任何其他平台,仍然感谢您指出问题,同样在我的实际程序中,TRAVELLER_DTYPE = bytearraySHARED_DTYPE = c_int 当前将 c_int 更改为 c_uint 解决了与字节数组。我想对此问题进行更多调查,但遗憾的是时间不允许。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-02-19
    • 2013-05-08
    • 2019-06-05
    • 2012-06-23
    • 2020-03-30
    • 2020-01-02
    相关资源
    最近更新 更多