【问题标题】:How to limit the number of simultaneous connections in Twisted如何限制 Twisted 中的同时连接数
【发布时间】:2015-07-23 15:22:36
【问题描述】:

所以我构建了一个扭曲的服务器,我想知道限制同时连接数的最佳方法是什么?

让我的工厂返回 None 是最好的方法吗?当我这样做时,我会抛出很多异常,例如:

exceptions.AttributeError: 'NoneType' object has no attribute 'makeConnection'

我希望以某种方式让客户端坐在队列中,直到当前连接数恢复正常,但我不知道如何异步执行此操作。

目前我正在使用我的工厂这样做:

class HandleClientFactory(Factory):

    def __init__(self):
            self.numConnections = 0 

    def buildProtocol(self, addr):
            #limit connection number here
            if self.numConnections >= Max_Clients:
                    logging.warning("Reached maximum Client connections")
                    return None

            return HandleClient(self)

这可行,但会断开连接而不是等待,并且还会引发许多未处理的错误。

【问题讨论】:

    标签: python python-2.7 asynchronous twisted twisted.internet


    【解决方案1】:

    您必须自己构建它。幸运的是,这些部件大部分都准备好了(你可能会要求更合适的部件,但是......)

    首先,为避免AttributeError(确实会导致连接关闭),请务必从您的buildProtocol 方法返回IProtocol 提供程序。

    class DoesNothing(Protocol):
        pass
    
    class YourFactory(Factory):
        def buildProtocol(self, addr):
            if self.currentConnections < self.maxConnections:
                return Factory.buildProtocol(self, addr)
            protocol = DoesNothing()
            protocol.factory = self
            return protocol
    

    如果您使用这个工厂(填写缺失的部分 - 例如,初始化 maxConnections 并正确跟踪 currentConnections),那么您会发现一旦达到限制就连接的客户端将获得 DoesNothing协议。他们可以向该协议发送尽可能多的数据。它将全部丢弃。它永远不会向他们发送任何数据。它将保持连接打开,直到他们关闭它。简而言之,它什么都不做。

    但是,您还希望客户端在连接数低于限制时真正接收到服务。

    要做到这一点,你需要更多的部分:

    • 您必须将他们可能发送的任何数据保持在缓冲中,以便在您准备好读取时可以读取。
    • 您必须跟踪连接,以便在时机成熟时开始为它们提供服务。
    • 您必须在上述时间开始为他们服务。

    对于第一个,您可以使用大多数传输的功能来“暂停”:

    class PauseTransport(Protocol):
        def makeConnection(self, transport):
            transport.pauseProducing()
    
    class YourFactory(Factory):
        def buildProtocol(self, addr):
            if self.currentConnections < self.maxConnections:
                return Factory.buildProtocol(self, addr)
            protocol = PauseTransport()
            protocol.factory = self
            return protocol
    

    PauseTransportDoesNothing 相似,但有一点(但有用)的区别是,一旦它连接到传输,它就会告诉传输暂停。因此,永远不会从连接中读取任何数据,并且在您准备好处理它时所有数据都将保持缓冲状态。

    对于下一个需求,存在许多可能的解决方案。最简单的一种是将工厂用作存储:

    class PauseAndStoreTransport(Protocol):
        def makeConnection(self, transport):
            transport.pauseProducing()
            self.factory.addPausedTransport(transport)
    
    class YourFactory(Factory):
        def buildProtocol(self, addr):
            # As above
            ...
    
        def addPausedTransport(self, transport):
            self.transports.append(transport)
    

    再一次,通过正确的设置(例如,初始化transports 属性),您现在拥有所有传输的列表,这些传输对应于您已接受的超过并发限制且等待服务的连接。

    对于最后一个要求,所需要做的就是实例化和初始化实际上能够为您的客户提供服务的协议。实例化很容易(这是您的协议,您可能知道它是如何工作的)。初始化主要是调用makeConnection 方法:

    class YourFactory(Factory):
        def buildProtocol(self, addr):
            # As above
            ...
        def addPausedTransport(self, transport):
            # As above
            ...
        def oneConnectionDisconnected(self)
            self.currentConnections -= 1
            if self.currentConnections < self.maxConnections:
                transport = self.transports.pop(0)
                protocol = self.buildProtocol(address)
                protocol.makeConnection(transport)
                transport.resumeProducing()
    

    我省略了跟踪buildProtocol 所需的address 参数的细节(transport 从它的起点带到程序的这一部分,应该清楚如何做某事如果您的程序确实需要,则与原始地址值类似)。

    除此之外,这里发生的所有事情都是您采用下一个排队传输(如果需要,您可以使用不同的调度算法,例如 LIFO)并将其连接到您选择的协议,就像 Twisted 所做的那样。最后,撤消之前的暂停操作,以便数据开始流动。

    或者……几乎。除了 Twisted 传输实际上并没有公开任何方式来更改它们传递数据到哪个协议之外,这将是非常巧妙的。因此,正如所写,来自客户端的数据实际上将被传递到原始的PauseAndStoreTransport 协议实例。您可以解决这个问题(“hack”显然是正确的词)。将传输 PauseAndStoreTransport 实例存储在工厂的列表中,然后:

        def oneConnectionDisconnected(self)
            self.currentConnections -= 1
            if self.currentConnections < self.maxConnections:
                originalProtocol, transport = self.transports.pop(0)
                newProtocol = self.buildProtocol(address)
    
                originalProtocol.dataReceived = newProtocol.dataReceived
                originalProtocol.connectionLost = newProtocol.connectionLost
    
                newProtocol.makeConnection(transport)
                transport.resumeProducing()
    

    现在,传输要调用方法的对象已将其方法替换为要调用方法的对象中的方法。同样,这显然是一个 hack。如果您愿意,您可能可以将一些不那么骇人听闻的东西放在一起(例如,明确支持委托给另一个协议的第三个协议类)。这个想法是一样的——它只会在你的键盘上磨损更多。对于它的价值,我怀疑使用Tubes 做类似的事情可能既容易又少打字,但我暂时将尝试基于该库的解决方案留给其他人。

    我已经避免解决保持currentConnections 正确更新的问题。由于您的问题中已经有numConnections,我假设您知道如何管理该部分。我在最后一步所做的只是假设您执行减量步骤的方式是在工厂调用oneConnectionDisconnected

    我还避免解决排队连接变得无聊并消失的事件。这将主要按书面方式工作 - Twisted 不会注意到连接已关闭,直到您调用 resumeProducing 然后在您的应用程序协议上调用 connectionLost。这应该没问题,因为您的协议无论如何都需要处理丢失的连接。

    【讨论】:

    • 非常感谢!非常透彻且易于理解!你给了一个很好的答案!
    • @Jean-Paul:在你回答的最后一部分,你说你所描述的是hackish。在您看来,为这种用途实施并发连接限制的正确方法是什么?
    猜你喜欢
    • 1970-01-01
    • 2020-10-12
    • 1970-01-01
    • 1970-01-01
    • 2018-09-12
    • 1970-01-01
    • 2019-09-11
    • 2019-02-23
    • 2021-09-16
    相关资源
    最近更新 更多