【问题标题】:How to share a global variable with another script in multiprocessing?如何在多处理中与另一个脚本共享全局变量?
【发布时间】:2021-07-07 16:57:22
【问题描述】:

问题:如何在 script2 中使用变量 x? 我有 2 个脚本,其中第一个包含 2 个多处理函数,第二个包含 1 个多处理函数。如何为所有 3 个多处理函数使用共享变量?

script1.py

from script2 import function3
x = None
def function1():
    global x
    while True:
        x = input()  # updates global variable x

def function2():
    global x
    while True:
        print(x)     # prints global variable x

p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p3 = multiprocessing.Process(target=function3)
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()

# some condition to stop all processes

script2.py

def function3():
    while True:      
        print(x*2)   # prints global variable x*2

【问题讨论】:

  • 进程不能共享变量;您需要某种进程间通信。最简单的方法可能是一个文本文件,但它可能非常脆弱; SQLite 数据库可能更健壮。
  • 您可以使用multiprocessing.managers.SyncManager有效地共享某些类型变量的值。如果您在这个网站上搜索,会有很多关于这样做的问题。

标签: python multiprocessing python-multiprocessing


【解决方案1】:

以下是根据@martineau 提供的评论创建共享托管 字符串值的示例。

在 Linux 等平台上,fork 默认用于创建您可以编码的新进程:

import multiprocessing
from ctypes import c_char_p

s = multiprocessing.Manager().Value(c_char_p, '')
event = multiprocessing.Event()

def function1():
    s.value = 'New value'  # updates global variable s
    event.set() # show we have a new value

def function2():
    event.wait() # wait for new s value
    print(s.value)

p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p1.start()
p2.start()
p1.join()
p2.join()

打印:

New value

在 Windows 等平台上,spawn 用于创建新进程,共享字符串作为参数传递给进程,以确保仅创建字符串的一个实例。

import multiprocessing
from ctypes import c_char_p

def function1(s, event):
    s.value = 'New value'
    event.set() # show we have a new value

def function2(s, event):
    event.wait() # wait for new s value
    print(s.value)

# I need this for Windows:
if __name__ == '__main__':
    s = multiprocessing.Manager().Value(c_char_p, '')
    event = multiprocessing.Event()
    p1 = multiprocessing.Process(target=function1, args=(s, event))
    p2 = multiprocessing.Process(target=function2, args=(s, event))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

打印:

New value

上面的if __name__ == '__main__': 检查是必需的,否则我们将进入递归循环,因为我们新创建的进程从顶部开始执行源代码,如果没有该检查,将创建新进程无限。出于这个原因,sevent 的定义不能超出该检查,否则每个新创建的进程都将创建自己的这些变量的实例。但这意味着我们现在必须将这些变量作为参数传递,而在分叉示例中它们只能被继承。

更新:在 Linux/Unix 上创建共享 numpy 数组

import multiprocessing
import ctypes
import numpy as np

def to_numpy_array(shared_array, shape):
    '''Create a numpy array backed by a shared memory Array.'''
    arr = np.ctypeslib.as_array(shared_array)
    return arr.reshape(shape)

def to_shared_array(arr, ctype):
    shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
    temp = np.frombuffer(shared_array, dtype=arr.dtype)
    temp[:] = arr.flatten(order='C')
    return shared_array

arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()

def function1():
    for x in range(shape[0]):
        for y in range(shape[1]):
            arr[x, y] = 1
    event.set() # show we have a new value

def function2():
    event.wait() # wait for new arr value
    print('arr =', arr)

p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p1.start()
p2.start()
p1.join()
p2.join()
print('arr =', arr)

打印:

arr = [[1 1 1]
 [1 1 1]]
arr = [[1 1 1]
 [1 1 1]]

在 Windows 上创建共享 numpy 数组

import multiprocessing
import ctypes
import numpy as np

def to_numpy_array(shared_array, shape):
    '''Create a numpy array backed by a shared memory Array.'''
    arr = np.ctypeslib.as_array(shared_array)
    return arr.reshape(shape)

def to_shared_array(arr, ctype):
    shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
    temp = np.frombuffer(shared_array, dtype=arr.dtype)
    temp[:] = arr.flatten(order='C')
    return shared_array

def function1(arr, event):
    shape = arr.shape
    for x in range(shape[0]):
        for y in range(shape[1]):
            arr[x, y] = 1
    event.set() # show we have a new value

def function2(arr, event):
    event.wait() # wait for new arr value
    print('arr =', arr)

if __name__ == '__main__':
    arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
    shape = arr.shape
    shared_array = to_shared_array(arr, ctypes.c_int32)
    # You have to now use the shared array as the base:
    arr = to_numpy_array(shared_array, shape)
    event = multiprocessing.Event()

    p1 = multiprocessing.Process(target=function1, args=(arr, event))
    p2 = multiprocessing.Process(target=function2, args=(arr, event))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print('arr =', arr)

在 Windows 上通过多处理池使用共享 numpy 数组

当使用多处理池时,无论是将数组作为参数传递给工作函数,还是在本例中使用它为池中的每个进程初始化一个全局变量,都必须将共享数组传递给每个进程并从中重新创建一个 numpy 数组。

import multiprocessing
import ctypes
import numpy as np

def to_numpy_array(shared_array, shape):
    '''Create a numpy array backed by a shared memory Array.'''
    arr = np.ctypeslib.as_array(shared_array)
    return arr.reshape(shape)

def to_shared_array(arr, ctype):
    shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
    temp = np.frombuffer(shared_array, dtype=arr.dtype)
    temp[:] = arr.flatten(order='C')
    return shared_array

def init_pool(shared_array, the_shape, the_event):
    global arr, shape, event
    shape = the_shape
    event = the_event
    # recreate the numpy array from the shared array:
    arr = to_numpy_array(shared_array, shape)

def function1():
    for x in range(shape[0]):
        for y in range(shape[1]):
            arr[x, y] = 1
    event.set() # show we have a new value

def function2():
    event.wait() # wait for new arr value
    print('arr =', arr)

if __name__ == '__main__':
    arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
    shape = arr.shape
    shared_array = to_shared_array(arr, ctypes.c_int32)
    # You have to now use the shared array as the base:
    arr = to_numpy_array(shared_array, shape)
    event = multiprocessing.Event()
    pool = multiprocessing.Pool(2, initializer=init_pool, initargs=(shared_array, shape, event))
    pool.apply_async(function1)
    pool.apply_async(function2)
    # wait for tasks to complete
    pool.close()
    pool.join()
    print('arr =', arr)

在 Linux/Unix 上使用带有多处理池的共享 numpy 数组

import multiprocessing
import ctypes
import numpy as np

def to_numpy_array(shared_array, shape):
    '''Create a numpy array backed by a shared memory Array.'''
    arr = np.ctypeslib.as_array(shared_array)
    return arr.reshape(shape)

def to_shared_array(arr, ctype):
    shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
    temp = np.frombuffer(shared_array, dtype=arr.dtype)
    temp[:] = arr.flatten(order='C')
    return shared_array

arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()

def function1():
    for x in range(shape[0]):
        for y in range(shape[1]):
            arr[x, y] = 1
    event.set() # show we have a new value

def function2():
    event.wait() # wait for new arr value
    print('arr =', arr)

pool = multiprocessing.Pool(2)
pool.apply_async(function1)
pool.apply_async(function2)
# wait for tasks to complete
pool.close()
pool.join()
print('arr =', arr)

【讨论】:

  • 我几乎得到了答案,但我希望将共享变量作为 NumPy 数组而不是字符串变量。 “ctype”是否支持 NumPy 共享变量?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-09-20
  • 1970-01-01
  • 2016-11-14
  • 1970-01-01
  • 2019-12-11
相关资源
最近更新 更多