【问题标题】:Use multiprocessing to get information from multiple sockets使用多处理从多个套接字获取信息
【发布时间】:2014-11-23 13:49:46
【问题描述】:

我正在用 python (ver 2.7) 开发一个小的 irc 客户端。我曾希望使用多处理来读取我当前连接的所有服务器,但我遇到了问题

import socket
import multiprocessing as mp
import types
import copy_reg
import pickle


def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class a(object):

    def __init__(self):
        sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock1.connect((socket.gethostbyname("example.com"), 6667))
        self.servers = {}
        self.servers["example.com"] = sock1

    def method(self, hostname):
        self.servers[hostname].send("JOIN DAN\r\n")
        print "1"

    def oth_method(self):
        pool = mp.Pool()
        ## pickle.dumps(self.method)
        pool.map(self.method, self.servers.keys())
        pool.close()
        pool.join()

if __name__ == "__main__":
    b = a()
    b.oth_method()

每当它到达pool.map(self.method, self.servers.keys()) 行时,我就会收到错误消息

TypeError: expected string or Unicode object, NoneType found

根据我的阅读,当我尝试腌制不可腌制的东西时会发生这种情况。为了解决这个问题,我首先按照here 的描述制作了_pickle_method_unpickle_method。然后我意识到我(最初)试图传递pool.map() 一个套接字列表(非常不可挑选),所以我将其更改为主机名列表,因为字符串可以被腌制。但是,我仍然收到此错误。

然后我尝试直接在self.methodself.servers.keys()self.servers.keys()[0] 上调用pickle.dumps()。正如预期的那样,后两者效果很好,但从第一个我得到

TypeError: a class that defines __slots__ without defining __getstate__ cannot be pickled.

更多研究将我引向this question,这似乎表明问题出在套接字的使用上(而gnibbler's answer 对该问题似乎证实了这一点)。

有没有一种方法可以让我实际使用多处理?从我(非常简短地)阅读的内容来看,pathos.multiprocessing 可能是我需要的,但如果可能的话,我真的很想坚持使用标准库。

我也不打算使用多处理 - 如果多线程可以更好地工作并避免这个问题,那么我对这些解决方案持开放态度。

【问题讨论】:

  • 您实际上是在尝试将套接字传递给子进程,还是只是您试图避免的意外发生的事情?对于前者,您需要迁移套接字,这必须在比 Python 酸洗更低的级别完成,并且每个平台都不同,因为在幕后,套接字只是文件描述符的包装器,并且您需要操作系统使相同的文件描述符意味着您的子进程中的相同套接字。
  • 同时,您使用多处理而不是多线程是有原因的吗? “从一堆服务器中读取数据”几乎与 I/O 绑定的范例案例一样接近,这正是线程的优点。
  • 首先,“Python 中的线程很慢”是不正确的。 如果你有 CPU 绑定代码,Python 中的线程会很慢,因为只有一个线程可以同时执行指令。如果您的线程几乎将所有时间都花在等待套接字接收或类似上,那么线程就没有问题,并且进程只会增加开销和复杂性而没有任何好处。
  • 第二,“我正在将子进程的字符串键传递给引用套接字的字典”。那么……它是如何得到那本词典的?这个特定于 Unix 的代码是否依赖于在启动时继承父级状态的子级?如果是这样,那么您为什么认为问题与酸洗套接字有关?如果不是,那您为什么认为没有必要进行套接字迁移?
  • 同时,您链接的问题的答案说使用协议 -1 而不是默认值将解决问题。你试过吗?如果有,发生了什么?

标签: python sockets multiprocessing pickle irc


【解决方案1】:

如果您确实想尝试跳出标准库,那么pathos.multiprocessing 的以下代码(正如您所提到的)不应引发酸洗错误,因为dill 序列化程序知道如何序列化套接字和文件句柄。

>>> import socket
>>> import pathos.multiprocessing as mp
>>> import types
>>> import dill as pickle
>>>
>>> class a(object):
...    def __init__(self):
...        sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
...        sock1.connect((socket.gethostbyname("example.com"), 6667))
...        self.servers = {}
...        self.servers["example.com"] = sock1
...    def method(self, hostname):
...        self.servers[hostname].send("JOIN DAN\r\n")
...        print "1"
...    def oth_method(self):
...        pool = mp.ProcessingPool()
...        pool.map(self.method, self.servers.keys())
...        pool.close()
...        pool.join()
...
>>> b = a()
>>> b.oth_method()

然而,一个问题是您需要使用multiprocessing 进行序列化,并且在许多情况下,套接字将序列化,以便关闭反序列化的套接字。原因主要是因为文件描述符没有按预期复制,它是通过引用复制的。使用dill,您可以自定义文件句柄的序列化,这样就可以传输内容而不是使用引用……但是,这对于套接字来说并不能很好地转换(至少目前是这样)。

我是dillpathos 的作者,我不得不同意@abarnert 你可能不想用multiprocessing 这样做(至少不存储服务器地图和插座)。如果你想使用multiprocessing's 线程接口,并且你发现你遇到任何序列化问题,pathos.multiprocessing 确实有mp.ThreadingPool() 而不是mp.ProcessingPool(),这样你就可以访问multiprocessing.dummy.Pool 周围的包装器,但仍然得到pathos 提供的附加功能(例如用于阻塞或异步管道和映射的多参数池等)。

【讨论】:

  • 我很好奇您是如何序列化文件句柄的。 unix_sock.sendmsg(sock)sock.share(pid) 并不是很难编写,但是将它们包装在进程池接口中似乎有点让人头疼。 (另外,处理套接字和文件在 Windows 上是不同的东西的事实……)但这可能与此处继续无关,因此我将下载您的模块并改为阅读它。 :)
  • @abarnert:对于套接字还没有什么聪明的做法。有几个选项(最近在github.com/uqfoundation/dill 上添加,而不是在当前版本中)用于处理序列化文件……它确实需要在 Windows 上进行测试。我知道 Windows 和任何其他操作系统之间的差异。 multiprocessing 的分支非常简单,我只是替换了序列化程序并在顶部添加了一个小的说服层,例如,允许 map 接受多个参数。
【解决方案2】:

您的根本问题是您无法将套接字传递给子进程。简单的解决方案是改用线程。

更详细的:


腌制一个绑定的方法需要腌制三件事:函数名、对象和类。 (我认为multiprocessing 会自动为您执行此操作,但您是手动执行此操作;这很好。)要腌制对象,您必须腌制其成员,在您的情况下包括一个值为套接字的 dict。

您不能使用 Python 2.x 中的默认酸洗协议来酸洗套接字。您链接的问题的答案解释了原因,并提供了简单的解决方法:不要使用默认的酸洗协议。但是socket 还有一个问题;它只是 C 扩展模块中定义的类型的包装器,它有自己的酸洗问题。您也许也可以解决这个问题……

但这仍然无济于事。在幕后,该 C 扩展类本身只是文件描述符的包装器。文件描述符只是一个数字。您的操作系统为每个进程保留文件描述符到打开套接字(以及文件和管道等)的映射;一个进程中的文件#4 不是另一个进程中的文件#4。因此,您需要在操作系统级别将套接字的文件描述符实际迁移到子级。这不是一件简单的事情,而且在每个平台上都不一样。当然,除了迁移文件描述符之外,您还必须传递足够的信息来重新构造socket 对象。所有这些都是可行的;甚至可能有一个库可以为您包装它。但这并不容易。


另一种可能性是在启动任何子级之前打开所有套接字,并将它们设置为由子级继承。但是,即使你可以重新设计你的代码来做这样的事情,这只适用于 POSIX 系统,而不适用于 Windows。


一个更简单的可能性是只使用线程而不是进程。如果您正在做 CPU 密集型工作,那么 Python 中的线程会出现问题(嗯,CPython,您几乎可以肯定正在使用的实现),因为有一个全局解释器锁可以防止两个线程同时解释代码。但是当您的线程将所有时间都花在等待socket.recv 和类似的 I/O 调用上时,使用线程就没有问题了。它们避免了酸洗数据和迁移套接字等的所有开销和复杂性。

您可能会注意到threading 模块没有像multiprocessing 那样好的Pool 类。然而,令人惊讶的是,stdlib 中有一个线程池类——它就在multiprocessing 中。您可以通过multiprocessing.dummy.Pool 访问它。

如果您愿意超越标准库,Python 3 中的 concurrent.futures 模块有一个名为 futures 的反向端口,您可以在 PyPI 上安装它。它包括一个ThreadPoolExecutor,它是围绕池的稍微更高级别的抽象,可能更易于使用。但是Pool 在这里也应该可以正常工作,而且您已经编写了代码。

【讨论】:

  • 谢谢,这是有道理的。早上我去看看,看看能不能搞定
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-09-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-12-29
  • 2020-08-18
相关资源
最近更新 更多