【问题标题】:recv_pyobj behaves different from Thread to Processrecv_pyobj 的行为从线程到进程不同
【发布时间】:2016-09-22 12:16:48
【问题描述】:


为了性能起见,我试图一次从 6 个不同进程上的套接字读取数据。我做了一个打开 6 个线程的测试,并从每个线程中读取一个套接字,并在打开 6 个子进程的另一个测试中读取一个不同的套接字。线程阅读工作正常,看起来像这样:

class ZMQServer:
    context = zmq.Context()
    socket = None
    ZMQthread = None

    def __init__(self, port, max_size):
        self.socket = self.context.socket(zmq.SUB)
        self.socket.setsockopt(zmq.SUBSCRIBE, '')
        self.socket.connect("tcp://127.0.0.1:" + str(port))

    def StartAsync(self):
        ZMQthread = threading.Thread(target=self.Start)
        ZMQthread.start()

   def Start(self):
        print "ZMQServer:Wait for next request from client on port: %d" % self.port
        while True:
            print "Running another loop"
            try: 
                message = self.socket.recv_pyobj()
            except: 
                print "ZMQServer:Error receiving messages"

if __name__ == '__main__':
    zmqServers = [None] * 6
    for idx in range (0, 6):
        zmqServers[idx] = ZMQServer(DTypes.PORTS_RECREGISTER[idx], 1024)
        zmqServers[idx].StartAsync()

这将显示:

ZMQServer:等待来自端口的客户端的下一个请求:4994
运行另一个循环
ZMQServer:等待来自端口的客户端的下一个请求:4995
运行另一个循环
ZMQServer:等待来自端口的客户端的下一个请求:4996
运行另一个循环
ZMQServer:等待来自端口的客户端的下一个请求:4997
运行另一个循环
ZMQServer:等待来自端口的客户端的下一个请求:4998
运行另一个循环
ZMQServer:等待来自端口的客户端的下一个请求:4999

重要提示:我通过套接字接收数据并发送它。
现在,我需要实现相同的行为,但只使用进程而不是线程,因此八核将使用更多的处理器。代码如下所示:

context = zmq.Context()
def CreateSocket(port):
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.SUBSCRIBE, '')
    socket.connect("tcp://127.0.0.1:" + str(port))

def Listen(socket, port):
    print "ZMQServer:Wait for next request from client on port: %d" % port
    while True:
        print "Running another loop"
        try: 
            message = socket.recv_pyobj()
            print "ZMQServer:Received request: %s" % message
        except: 
            print "ZMQServer:Error receiving messages"
            continue

  #I'm trying first only with 1 Process - 1 socket:

   if __name__ == '__main__':
        port = DTypes.PORTS_RECREGISTER[0]
        socket = CreateSocket(port)
        proc = Process(target=Listen, args=(socket, port))
        proc.start()
        proc.join()

输出很奇怪:

ZMQServer:等待来自端口的客户端的下一个请求:4994
运行另一个循环
ZMQServer:接收消息时出错
运行另一个循环
ZMQServer:接收消息时出错
运行另一个循环
ZMQServer:接收消息时出错
......

非常重要的是,当我发送数据时,我没有在套接字上收到数据
所以,据我所知:
1. 方法进入并在每个while循环上更改套接字?
2. 或者 recv_pyobj 不再阻塞?
以前有人经历过吗?有人知道如何正确地进行多进程套接字读取吗?
谢谢

【问题讨论】:

    标签: python multithreading sockets subprocess


    【解决方案1】:

    实际上,问题在于将 SOCKET 类型参数传递给进程/线程。似乎酸洗,在幕后序列化传递给回调方法的数据并不能很好地序列化 SOCKET 参数。
    所以,我改变了它,所以我不会传递套接字参数:

    context = zmq.Context()
    
    def ListeToSocket(port):
        socket = context.socket(zmq.SUB)
        socket.setsockopt(zmq.SUBSCRIBE, '')
        socket.connect("tcp://127.0.0.1:" + str(port))
        print "Wait for data on port: %d" % port
        while True:
            try:
                message = socket.recv_pyobj()
                print "PORT:%d Received data: %s" % (port, message)
            except:
                print "PORT:%d: Error receiving messages" % port
    
    if __name__ == '__main__':
        for idx in range(0, DTypes.SOCKET_MAX):
            proc1 = Process(target=ListeToSocket, args=(DTypes.PORTS_RECREGISTER[idx], ))
            proc1.start()
    

    【讨论】:

      猜你喜欢
      • 2012-04-05
      • 1970-01-01
      • 2011-05-18
      • 2021-07-29
      • 1970-01-01
      • 2018-10-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多