【问题标题】:How do read and writes work with a manager in Python?Python 中的管理器如何读写?
【发布时间】:2021-10-17 14:40:57
【问题描述】:

对不起,如果这是一个愚蠢的问题,但我无法理解管理器在 python 中的工作方式。

假设我有一个包含要在所有进程之间共享的字典的管理器。我希望一次只有一个进程写入字典,而其他许多进程从字典中读取。

  1. 这是否可以在没有同步原语的情况下同时发生,或者如果同时发生读/写操作会中断?
  2. 如果我想让多个进程同时写入字典怎么办 - 是允许还是会中断(我知道这可能会导致竞争条件,但它会出错)?
  3. 此外,管理器是按队列的方式处理每个读写事务,一次一个,还是一次处理所有事务?

https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes

【问题讨论】:

  • 1.同时读/写通常是一个问题 - 这就是互斥锁/信号量/等的原因。存在。 2. 使用互斥体,因此只允许一个同时写入者 - 在某些情况下(更改多个变量等),您可能希望在写入发生时排除读取。 3. 不确定管理人员的具体实施方式
  • 感谢您的回复,但我的问题专门针对 python 管理器,以及如果两个进程修改管理器持有的共享对象(或者如果一个进程读取而另一个进程写入)会发生什么。我想知道后台管理人员是否一次处理读/写一个。
  • 看起来很久以前,这个答案似乎表明读取和写入一次只发生一次,但加载结果是同时发生的?这是为了价值,我想知道字典。 eli.thegreenplace.net/2012/01/04/…

标签: python multiprocessing synchronization multiprocessing-manager


【解决方案1】:

这取决于你如何写入字典,即操作是否是原子的

my_dict[some_key] = 9 # this is atomic
my_dict[some_key] += 1 # this is not atomic

因此,像上面第一行代码那样创建新密钥并更新现有密钥是原子操作。但是第二行代码确实是多个操作等价于:

temp = my_dict[some_key]
temp = temp + 1
my_dict[some_key] = temp

因此,如果两个进程并行执行my_dict[some_key] += 1,它们可能会读取temp = my_dict[some_key] 的相同值并将temp 递增到相同的新值,最终结果是字典值只会递增一次.这可以证明如下:

from multiprocessing import Pool, Manager, Lock

def init_pool(the_lock):
    global lock
    lock = the_lock

def worker1(d):
    for _ in range(1000):
        with lock:
            d['x'] += 1

def worker2(d):
    for _ in range(1000):
        d['y'] += 1

if __name__ == '__main__':
    lock = Lock()
    with Manager() as manager, \
    Pool(4, initializer=init_pool, initargs=(lock,)) as pool:
        d = manager.dict()
        d['x'] = 0
        d['y'] = 0
        # worker1 will serialize with a lock
        pool.apply_async(worker1, args=(d,))
        pool.apply_async(worker1, args=(d,))
        # worker2 will not serialize with a lock:
        pool.apply_async(worker2, args=(d,))
        pool.apply_async(worker2, args=(d,))
        # wait for the 4 tasks to complete:
        pool.close()
        pool.join()
        print(d)

打印:

{'x': 2000, 'y': 1162}

更新

就序列化而言:

BaseManager 默认使用 Linux 的套接字和 Windows 的命名管道创建服务器。因此,例如,基本上您对托管字典执行的每个方法都非常类似于通过消息传递实现的远程方法调用。这也意味着服务器也可以完全在不同的计算机上运行。 但是,这些方法调用没有序列化;对象方法本身必须是线程安全的,因为每个方法调用都在一个新线程中运行。

以下是创建我们自己的托管类型并让服务器侦听可能来自不同计算机的请求的示例(尽管在此示例中,客户端在同一台计算机上运行)。客户端跨两个线程在托管对象上调用increment 1000 次,但方法实现不是在锁下完成的,所以当我们全部完成时self.x 的结果值不是 1000。此外,当我们检索x 的值同时通过方法get_x 两次,我们看到两个调用或多或少同时启动:

from multiprocessing.managers import BaseManager
from multiprocessing.pool import ThreadPool
from threading import Event, Thread, get_ident
import time

class MathManager(BaseManager):
    pass

class MathClass:
    def __init__(self, x=0):
        self.x = x

    def increment(self, y):
        temp = self.x
        time.sleep(.01)
        self.x = temp + 1

    def get_x(self):
        print(f'get_x started by thread {get_ident()}', time.time())
        time.sleep(2)
        return self.x

    def set_x(self, value):
        self.x = value

def server(event1, event2):
    MathManager.register('Math', MathClass)
    manager = MathManager(address=('localhost', 5000), authkey=b'abracadabra')
    manager.start()
    event1.set() # show we are started
    print('Math server running; waiting for shutdown...')
    event2.wait() # wait for shutdown
    print("Math server shutting down.")
    manager.shutdown()

def client():
    MathManager.register('Math')
    manager = MathManager(address=('localhost', 5000), authkey=b'abracadabra')
    manager.connect()
    math = manager.Math()
    pool = ThreadPool(2)
    pool.map(math.increment, [1] * 1000)
    results = [pool.apply_async(math.get_x) for _ in range(2)]
    for result in results:
        print(result.get())

def main():
    event1 = Event()
    event2 = Event()
    t = Thread(target=server, args=(event1, event2))
    t.start()
    event1.wait() # server started
    client() # now we can run client
    event2.set()
    t.join()

# Required for Windows:
if __name__ == '__main__':
    main()

打印:

Math server running; waiting for shutdown...
get_x started by thread 43052 1629375415.2502146
get_x started by thread 71260 1629375415.2502146
502
502
Math server shutting down.

【讨论】:

  • 我了解比赛条件。我的问题更多的是关于经理如何在幕后处理这个问题。这是否意味着它一次处理所有读取和写入?
  • 啊,这就解释了。谢谢!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-12-24
  • 2017-05-18
  • 1970-01-01
  • 2021-08-22
  • 2012-02-08
  • 1970-01-01
  • 2022-08-18
相关资源
最近更新 更多