【问题标题】:condition variable not holding wait across two threads条件变量不在两个线程之间保持等待
【发布时间】:2018-02-20 09:49:28
【问题描述】:

对我遇到的这个线程同步感到困惑。 基本上,我正在写入输出缓冲区,并等待条件变量,直到读取缓冲区填充了来自套接字的响应。这是一个非常简单的线程同步。

def write_wait_response(self, buffer, timeout=30):
        '''
            Write and wait for response
            Params:
                Buffer BYTE encoded data
                Timeout timeout to wait for response
            Returns: 
                response str if successful
        '''
        self.buffer = buffer

        if self.waitLock(timeout):
            # condition var was signaled, we can return a response
            readbuf = bytes(self.readbuffer)
            self.readbuffer = b''
            return readbuf
        else:
            print("AsyncClientSocket: No response recieved from {} in {} seconds, dumping buffer".format(
                    self.sa, timeout))
            self.buffer = ''
            raise TimeoutError("AsyncClientSocket Timed Out")

def handle_read(self):
        self.readbuffer, address = self.recvfrom(2048)
        print(self.readbuffer)
        print("notifying")
        self.cond.notifyAll() 

看起来很简单,对吧?有 1 个线程在条件变量上等待,1 个线程(异步异步回调循环)将填充 self.readbuffer 并通知条件变量。更奇怪的是:如果我执行 time.sleep() 而不是使用条件变量,我会在 write_wait_response() 的调用线程上得到一个完全填充的 self.readbuffer。显然这不是我能接受的解决方案。

这是我所期待的:

  1. 调用 write_wait_response(buffer),这将写入缓冲区并等待 在 self.cond 上
  2. asyncore 回调循环调用 handle_write,将字节写入套接字。
  3. 服务器接收字节,写入响应。
  4. 异步回调循环看到套接字上的字节,读入 self.readbuffer,通知 cv
  5. ???????????? write_wait_response 应该取消阻止吗?

控制台输出:

waiting <- thread 1 waiting on CV
AsyncClientSocket: writing 5 bytes <- thread 2: handle_write
b'200,2' <- thread 2: that's the server response
notifying <- thread 2: that's handle_read attempting to notify the held CV

error: uncaptured python exception, closing channel <my_socket_stuff.AsyncClientSocket connected 127.0.0.1:50000 at 0x1051bf438> (<class 'RuntimeError'>:cannot notify on un-acquired lock

注意:在此日志的末尾,线程 1 仍在等待 self.cond。怎么回事?

全班:

class AsyncClientSocket(asyncore.dispatcher):
    def __init__(self, socketargs):
        asyncore.dispatcher.__init__(self)
        family, type, proto, canonname, sa = socketargs
        self.sa = sa
        self.create_socket(family, type)

        if type == socket.SOCK_STREAM:
            self.connect( sa )
        elif type == socket.SOCK_DGRAM:
            pass

        self.buffer = b''
        self.lock = threading.Lock()
        self.cond = threading.Condition(self.lock)
        self.readbuffer = b''

    def write_wait_response(self, buffer, timeout=30):
        '''
            Write and wait for response
            Params:
                Buffer BYTE encoded data
                Timeout timeout to wait for response
            Returns: 
                response str if successful
        '''
        self.buffer = buffer

        if self.waitLock(timeout):
            # condition var was signaled, we can return a response
            readbuf = bytes(self.readbuffer)
            self.readbuffer = b''
            return readbuf
        else:
            print("AsyncClientSocket: No response recieved from {} in {} seconds, dumping buffer".format(
                    self.sa, timeout))
            self.buffer = ''
            raise TimeoutError("AsyncClientSocket Timed Out")

    def waitLock(self, timeout):
        '''
            Wait for timeout seconds on CV
        '''
        try:
            self.cond.acquire()
            print("waiting")
            return self.cond.wait(timeout)
        finally:
            self.cond.release()


    def handle_connect(self):
        pass

    def handle_close(self):
        self.close()

    def handle_read(self):
        self.readbuffer, address = self.recvfrom(2048)
        print(self.readbuffer)
        print("notifying")
        self.cond.notifyAll() 

    def writable(self):
        return (len(self.buffer) > 0)

    def handle_write(self):
        print("AsyncClientSocket: writing {} bytes".format(len(self.buffer)))
        self.readbuffer = b''
        sent = self.sendto(self.buffer, self.sa)
        self.buffer = self.buffer[sent:]

【问题讨论】:

  • 尝试一个自包含的完整示例,sscce.org
  • @Jean-PaulCalderone 有一个简洁的例子(不存在)。既然我知道我错误地通知了条件变量,我可以组合一个 SSCCE,但在此之前,我的印象是问题与 asyncore 纠缠不清,以某种方式改变了线程上下文,因此条件变量没有被视为包含在handle_read() 函数。
  • 很多人在理解问题之前已经构建了很多简洁的例子。这是可能的。

标签: python multithreading python-3.x sockets asyncore


【解决方案1】:

想通了。这与异步无关。我只是错误地发出条件变量的信号。 The python3 threading api doc says the calling thread of notify() must acquire the underlying lock 这是有道理的,不希望两个生产者通知同一个条件变量。希望一个在关键部分阻塞,而另一个执行其任务。

def handle_read(self):
    try:
        self.cond.acquire()
        self.readbuffer, address = self.recvfrom(2048)
        print(self.readbuffer)
        self.cond.notify() 
    finally:
        self.cond.release()

【讨论】:

    猜你喜欢
    • 2022-01-25
    • 1970-01-01
    • 1970-01-01
    • 2015-09-18
    • 1970-01-01
    • 1970-01-01
    • 2016-09-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多