【问题标题】:Multithreading - Alternating between two threads using Conditions and Events in Python多线程 - 在 Python 中使用条件和事件在两个线程之间交替
【发布时间】:2013-11-14 19:44:20
【问题描述】:

我正在尝试编写一个程序,我希望使用它在两个线程之间交替,线程1 和线程2。棘手的部分是线程应该首先开始执行必须是thread1。 这是我到目前为止的代码:

Class Client:
#member variables
def sendFile(self,cv1,lock1):

        sent=0;
        while (i<self.size):
            message = self.data[i:1024+i]
            cv1.acquire()
            BadNet.transmit(self.clientSocket,message,self.serverIP,self.serverPort)
            cv1.notify() 
            cv1.release()

            i = i+1024
            sent+=1
            lock1.wait()

        print "File sent successfully !"   
        self.clientSocket.close()

    def receiveAck(self,cv1,lock2):
        i=0
        while (1):
            lock1.clear()
            cv1.acquire()
            cv1.wait()
            print "\nentered ack !\n"
            self.ack, serverAddress = self.clientSocket.recvfrom(self.buf)

            cv1.release()
            lock1.set()


if __name__ == "__main__":
    lock1 = Event()
    cv1 = Condition()
    cv2= Condition()
    client = Client();
    client.readFile();

    thread1 = Thread(target = client.sendFile, args=[cv1,lock1])
    thread2 = Thread(target = client.receiveAck, args=[cv1,lock1])

    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()

我目前面临的问题是,最初程序确实在两个线程之间交替(由控制台上的输出确认。但是在任意数量的迭代(通常在 20 到 80 之间)之后,程序只是挂起并且没有进一步的迭代被执行。

【问题讨论】:

  • 我的第一个问题是,你为什么要这个?如果两个线程只是显式地交换控制权,那么拥有两个线程有​​什么意义呢?使用生成器可以更轻松地做到这一点?
  • 为了便于理解,我只是简化了我的程序。实际上发生的事情是 thread1 使用 UDP 套接字发送数据包,而 thread2 使用相同的套接字接收该数据包的确认。除非已收到对先前数据包的确认,否则不应继续执行 Thread1。我希望你明白我的意思,但即使你不明白,我也会要求你忽略我在两个线程之间交替的目的,并帮助解决我面临的问题。
  • 听起来相当顺序...为什么要使用线程而不是简单地使用阻塞 IO?阻塞,直到你得到你的 ACK,然后在循环中移动到下一个数据包,为什么是线程?
  • 一个明显的问题是您没有在服务员端使用条件。您必须循环检查条件并在 acquire/release 对中调用 cv1.wait(),或者您只是将 cv 用作损坏的事件+锁定组合。更重要的是,您首先看不到有任何条件可以使用 cv 进行保护。例如,如果套接字是非阻塞的,并且您检查了 recvfrom 没有准备好,那将是用 cv 保护的东西,但您没有这样做。
  • 另外,给我们一个可以调试的完全运行的例子(SSCCE)会让事情变得更容易;与其猜测代码中可能有什么问题,我们可以弄清楚是什么错误。

标签: python multithreading events synchronization


【解决方案1】:

您的同步至少有两个问题。

首先,您使用 cv1 错误。您的接收线程必须在其 cv 周围循环,检查条件并每次调用 wait。否则,您只是将 cv 用作损坏的事件 + 锁定组合。你没有这样的循环。更重要的是,你连等待的条件都没有。

其次,您使用 lock1 错误。您的接收线程设置事件,然后立即清除它。但是不能保证发送线程已经进入等待状态。 (上一个问题的竞争使这个问题更加严重,但即使你修复它仍然是一个问题。)在多核机器上,它通常会及时到达那里,但是“通常”在线程编程中比从不更糟糕。因此,最终发送线程将在接收线程完成清除后等待,因此它将永远等待。同时,接收线程将等待发送线程的通知,这永远不会发生。所以你陷入了僵局。

为了将来参考,在每个阻塞操作之前和之后添加 print 语句,尤其是同步操作,这会增加调试的难度:你会看到接收线程的最后一条消息是“receive waiting on cv1”,而发送线程的最后一条消息是“send waiting on lock1”,很明显死锁在哪里。


无论如何,我不确定“修复”没有条件的 cv 或您试图用作 cv 的事件意味着什么,所以我将展示如何编写一些明智的东西有两个简历。在这种情况下,我们不妨只使用一个来回翻转的标志作为两个 cvs 的条件。

当我在做这件事的时候,我会修复一些其他问题,这些问题使您的代码甚至无法测试(例如,i 从未初始化),并包括调试信息,以及我必须填写的内容将此作为一个完整的示例,否则我会尽量保留您的结构和不相关的问题(例如 Client 是一个旧式类)。

class Client:
    def __init__(self):
        self.clientSocket = socket(AF_INET, SOCK_DGRAM)
        self.serverIP = '127.0.0.1'
        self.serverPort = 11111
        self.buf = 4
        self.waitack = False

    def readFile(self):
        self.data = ', '.join(map(str, range(100000)))
        self.size = len(self.data)

    #member variables
    def sendFile(self,cv1,lock1):
        i = 0
        sent=0
        while (i<self.size):
            message = self.data[i:1024+i]
            print "s cv1 acquire"
            with cv1:
                print "s sendto"
                self.clientSocket.sendto(message, (self.serverIP, self.serverPort))
                self.waitack = True
                print "s cv1 notify"
                cv1.notify() 

            i = i+1024
            sent+=1

            print "s cv2 acquire"
            with cv2:
                print "s cv2 wait"
                while self.waitack:
                    cv2.wait()

        print "File sent successfully !"   
        self.clientSocket.close()

    def receiveAck(self,cv1,lock2):
        i=0
        while (1):
            print "r cv1 acquire"
            with cv1:
                while not self.waitack:
                    print "r cv1 wait"
                    cv1.wait()
            print "r recvfrom"
            self.ack, serverAddress = self.clientSocket.recvfrom(self.buf)
            i += 1
            print self.ack, i            

            print "r cv2 acquire"
            with cv2:
                self.waitack = False
                print "r cv2 notify"
                cv2.notify()

这里有一个测试服务器:

from itertools import *
from socket import *

s = socket(AF_INET, SOCK_DGRAM)
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.bind(('127.0.0.1', 11111))

for i in count():
    data, addr = s.recvfrom(1024)
    print(i)
    s.sendto('ack\n', addr)

启动服务器,启动客户端,服务器将计数到 672,客户端将计数到 673(因为您的代码从 1 开始计数)有 673 对平衡的消息和“文件发送成功!”在末尾。 (当然客户端会永远挂起,因为receiveAck 没有办法完成,而服务器因为我把它写成一个无限循环。)

【讨论】:

  • 非常感谢您的回答。它完美地工作,就像我打算对我的原始代码做的那样。我很抱歉没有提供功能齐全的代码,下次我会小心的。至于使用线程执行相当顺序的任务的问题,这实际上是我正在处理的这项任务的要求!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-04-22
  • 2022-11-26
  • 2022-11-26
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多