【问题标题】:Producer Consumer using semaphores and mutexes in Python生产者消费者在 Python 中使用信号量和互斥锁
【发布时间】:2020-01-21 22:30:20
【问题描述】:

我试图了解如何使用 Python 信号量来实现一个缓冲区大小有界的队列,可供多个生产者和消费者使用。这是我的实现:

class Q:

    def __init__(self, size):
        self.buff = [None]*size
        self.end = 0
        self.start = 0
        self.size = size
        self.end_lock = Lock()  # protect end from race across multiple producers
        self.start_lock = Lock()  # protect start from race across multiple consumers
        self.open = Semaphore(size)  # block till there's space to produce
        self.closed = Semaphore(size) # block till there's item to consume
        for _ in range(size):  # initialize with all closed acquired so that consumer is blocked
            self.closed.acquire()

    def put(self, val):
        self.open.acquire()
        with self.end_lock:
            self.buff[self.end] = val
            self.end = (self.end+1)%self.size
        self.closed.release()

    def get(self):
        self.closed.acquire()
        with self.start_lock:
            val = self.buff[(self.start)%self.size]
            self.start = (self.start+1)%self.size
        self.open.release()
        return val

这个实现没有错误吗?是否可以进一步简化以使用更少的互斥体/信号量?

【问题讨论】:

  • put 中有一个错误:您将值写入索引 1、2、3...,但从索引 0、1、2...读取它们所以第一个 get将始终返回None
  • 您可能想在get 的末尾使用self.start_lock.release() 而不是acquire。我会考虑使用锁作为上下文管理器,即with self.start_lock:,以避免这种错误。
  • 谢谢@PoByBolek,我已经解决了这些问题

标签: python mutex semaphore python-multithreading producer-consumer


【解决方案1】:

我觉得不错。信号量防止并发生产者和消费者写入和读取过多,锁防止并发生产者或消费者同时修改endstart 索引。

这两个信号量是绝对必要的。您可以删除其中一个锁并在getput 中使用它来保护startend 索引,这将不允许消费者和生产者同时访问队列。 (CPython's queue implementation 会这样做。)


我会删除size 属性以支持len(self.buff),并将startend 索引分别重命名为read_indexwrite_index(以及锁)。另外,我认为您可以在不持有锁的情况下访问缓冲区(因为lists themselves are thread-safe):

    def put(self, val):
        self.open.acquire()
        with self.write_lock:
            index = self.write_index
            self.write_index = (self.write_index + 1) % len(self.buff)
        self.buff[index] = val
        self.closed.release()

    def get(self):
        self.closed.acquire()
        with self.read_lock:
            index = self.read_index
            self.read_index = (self.read_index + 1) % len(self.buff)
        val = self.buff[index]
        self.open.release()
        return val

这是我曾经玩过的一个小测试程序:

def producer(queue, start, end, step):
    for value in range(start, end, step):
        queue.put(value)
    print('Producer finished')


def consumer(queue, count, result, lock):
    local_result = []
    for _ in range(count):
        local_result.append(queue.get())
    with lock:
        result.update(local_result)
    print('Consumer finished')


def main():
    value_count = 500000
    producer_count = 50
    consumer_count = 50
    assert value_count % producer_count == 0
    assert value_count % consumer_count == 0

    queue = Queue(123)
    result = set()
    lock = Lock()
    producers = [Thread(target=producer, args=(queue, i, value_count, producer_count)) for i in range(producer_count)]
    consumers = [Thread(target=consumer, args=(queue, value_count // consumer_count, result, lock)) for _ in range(consumer_count)]

    for p in producers:
        p.start()
    for c in consumers:
        c.start()

    for p in producers:
        p.join()
    for c in consumers:
        c.join()

    if len(result) != value_count:
        raise ValueError('Result size is %d instead of %d' % (len(result), value_count))


if __name__ == '__main__':
    main()

【讨论】:

    【解决方案2】:
    from time import sleep
    from random import randint
    from threading import Thread, Semaphore
    
    s = Semaphore(1)
    
    producer_idx = 0
    consumer_idx = 0
    counter = 0
    
    buf_size = 10
    buf = [" "] * buf_size
    print(buf)
    
    def produce():
        global producer_idx, counter, buf, buf_size
        while True:
            #s.acquire()
            with s:
                if (counter == buf_size): # full
                    #s.release()
                    continue
                buf[producer_idx] = "x"
                producer_idx = (producer_idx + 1) % buf_size
                print("{} <= produced 'x' at index='{}'".format(buf, producer_idx))
                counter = counter + 1
            #s.release()
            sleep(1)
    
    def consume():
        global consumer_idx, counter, buf, buf_size
        while True:
            #s.acquire()
            with s:
                if (counter == 0): # empty (next element is)
                    #s.release()
                    continue
                buf[consumer_idx] = " "
                consumer_idx = (consumer_idx + 1) % buf_size
                print("{} => consumed '{}' at index='{}'".format(buf, buf[consumer_idx], consumer_idx))
                counter = counter - 1
            #s.release()
            sleep(1)
    
    producers = list()
    
    for i in range(randint(10,20)):
        producer = Thread(target=produce)
        producer.start()
        producers.append(producer)
    
    consumers = list()
    
    for i in range(randint(10,20)):
        consumer = Thread(target=consume)
        consumer.start()
        consumers.append(consumer)
    
    moi python $ python boundedbuffer_semaphore.py 
    [' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ']
    ['x', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' '] <= produced 'x' at index='1'
    ['x', 'x', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' '] <= produced 'x' at index='2'
    ['x', 'x', 'x', ' ', ' ', ' ', ' ', ' ', ' ', ' '] <= produced 'x' at index='3'
    ['x', 'x', 'x', 'x', ' ', ' ', ' ', ' ', ' ', ' '] <= produced 'x' at index='4'
    ['x', 'x', 'x', 'x', 'x', ' ', ' ', ' ', ' ', ' '] <= produced 'x' at index='5'
    ['x', 'x', 'x', 'x', 'x', 'x', ' ', ' ', ' ', ' '] <= produced 'x' at index='6'
    ['x', 'x', 'x', 'x', 'x', 'x', 'x', ' ', ' ', ' '] <= produced 'x' at index='7'
    ['x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', ' ', ' '] <= produced 'x' at index='8'
    ['x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', ' '] <= produced 'x' at index='9'
    ['x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', 'x'] <= produced 'x' at index='0'
    [' ', 'x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', 'x'] => consumed 'x' at index='1'
    [' ', ' ', 'x', 'x', 'x', 'x', 'x', 'x', 'x', 'x'] => consumed 'x' at index='2'
    ['x', ' ', 'x', 'x', 'x', 'x', 'x', 'x', 'x', 'x'] <= produced 'x' at index='1'
    ['x', ' ', ' ', 'x', 'x', 'x', 'x', 'x', 'x', 'x'] => consumed 'x' at index='3'
    ['x', ' ', ' ', ' ', 'x', 'x', 'x', 'x', 'x', 'x'] => consumed 'x' at index='4'
    ['x', 'x', ' ', ' ', 'x', 'x', 'x', 'x', 'x', 'x'] <= produced 'x' at index='2'
    ['x', 'x', 'x', ' ', 'x', 'x', 'x', 'x', 'x', 'x'] <= produced 'x' at index='3'
    ['x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', 'x'] <= produced 'x' at index='4'
    ['x', 'x', 'x', 'x', ' ', 'x', 'x', 'x', 'x', 'x'] => consumed 'x' at index='5'
    ['x', 'x', 'x', 'x', ' ', ' ', 'x', 'x', 'x', 'x'] => consumed 'x' at index='6'
    ['x', 'x', 'x', 'x', 'x', ' ', 'x', 'x', 'x', 'x'] <= produced 'x' at index='5'
    ['x', 'x', 'x', 'x', 'x', ' ', ' ', 'x', 'x', 'x'] => consumed 'x' at index='7'
    ['x', 'x', 'x', 'x', 'x', 'x', ' ', 'x', 'x', 'x'] <= produced 'x' at index='6'
    ['x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', 'x'] <= produced 'x' at index='7'
    ['x', 'x', 'x', 'x', 'x', 'x', 'x', ' ', 'x', 'x'] => consumed 'x' at index='8'
    ['x', 'x', 'x', 'x', 'x', 'x', 'x', ' ', ' ', 'x'] => consumed 'x' at index='9'
    ['x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', ' ', 'x'] <= produced 'x' at index='8'
    ['x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', 'x'] <= produced 'x' at index='9'
    ['x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', ' '] => consumed 'x' at index='0'
    [' ', 'x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', ' '] => consumed 'x' at index='1'
    [' ', ' ', 'x', 'x', 'x', 'x', 'x', 'x', 'x', ' '] => consumed 'x' at index='2'
    [' ', ' ', ' ', 'x', 'x', 'x', 'x', 'x', 'x', ' '] => consumed 'x' at index='3'
    [' ', ' ', ' ', ' ', 'x', 'x', 'x', 'x', 'x', ' '] => consumed 'x' at index='4'
    [' ', ' ', ' ', ' ', ' ', 'x', 'x', 'x', 'x', ' '] => consumed 'x' at index='5'
    [' ', ' ', ' ', ' ', ' ', ' ', 'x', 'x', 'x', ' '] => consumed 'x' at index='6'
    [' ', ' ', ' ', ' ', ' ', ' ', ' ', 'x', 'x', ' '] => consumed 'x' at index='7'
    [' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', 'x', ' '] => consumed 'x' at index='8'
    

    https://github.com/binarytrails/various/blob/master/python/boundedbuffer_semaphore.py

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-10-15
      • 1970-01-01
      • 2015-09-25
      • 1970-01-01
      • 1970-01-01
      • 2014-03-26
      • 1970-01-01
      相关资源
      最近更新 更多