【问题标题】:Massconnect loss of attribute socket属性套接字的 Massconnect 丢失
【发布时间】:2013-02-25 11:36:08
【问题描述】:

我正在运行 massconnect.py 模块以模拟打开许多并行 web-socket 连接,但是当我尝试在一段时间后打开 15000 个连接(在 massconnect json 中指定)时,我收到以下错误消息:

Unhandled Error
Traceback (most recent call last):
  File "C:\Python27\lib\site-packages\twisted\python\log.py", line 88, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\log.py", line 73, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\context.py", line 118, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\context.py", line 81, in callWithContext
    return func(*args,**kw)
--- <exception caught here> ---
  File "C:\Python27\lib\site-packages\twisted\internet\iocpreactor\reactor.py", line 120, in _callEventCallback
    evt.callback(rc, bytes, evt)
  File "C:\Python27\lib\site-packages\twisted\internet\iocpreactor\tcp.py", line 285, in cbConnect
    self.socket.setsockopt(
exceptions.AttributeError: 'Client' object has no attribute 'socket'

web-socket 连接的打开被中断,我得到了这个异常。

我调整了我的 massconnect.py 模块来确认应用程序的 web-socket 身份验证。我正在测试,以便每次分配客户端时都会发送唯一的身份验证消息。

我已经在注册表中将MaxUserPort 设置为 65534。我认为这个问题可能会导致我的内存不足,但在使用不同的机器检查后,我意识到这不是原因。

这可能是一个已知的 python/twisted 问题吗?

@字形:

这里是massconnect.py源代码:

'''
Created on Jan 29, 2013

@author: boro.petrovic
'''
###############################################################################
##
##  Copyright 2011,2012 Tavendo GmbH
##
##  Licensed under the Apache License, Version 2.0 (the "License");
##  you may not use this file except in compliance with the License.
##  You may obtain a copy of the License at
##
##      http://www.apache.org/licenses/LICENSE-2.0
##
##  Unless required by applicable law or agreed to in writing, software
##  distributed under the License is distributed on an "AS IS" BASIS,
##  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
##  See the License for the specific language governing permissions and
##  limitations under the License.
##
###############################################################################

import sys
import time
from twisted.internet import reactor
from twisted.internet import error
from twisted.internet.defer import Deferred, returnValue, inlineCallbacks
from twisted.internet.interfaces import IReactorTime
from autobahn.websocket import connectWS
from autobahn.websocket import WebSocketClientFactory, WebSocketClientProtocol
import time

f='start'

class MassConnectProtocol(WebSocketClientProtocol):   

    def sendHello(self, message):
        #self.sendMessage("Hello from Python!")
        self.sendMessage(payload=message)
        #reactor.callLater(2, self.sendHello(message='test'))

    def onOpen(self):
        global f
        wstoken = f.readline().rstrip()
        message = 'messagecontent'+wstoken
        self.factory.test.onConnected()
        self.sendHello(message)
        self.wstoken = wstoken

    def onMessage(self, msg, binary):
        print "Got message from wstoken " + self.wstoken + " :" + msg

class MassConnectFactory(WebSocketClientFactory):

    def clientConnectionFailed(self, connector, reason):
        if self.test.onFailed():
            reactor.callLater(float(self.retrydelay)/1000., connector.connect)

    def clientConnectionLost(self, connector, reason):
        if self.test.onLost():
            reactor.callLater(float(self.retrydelay)/1000., connector.connect)

class MassConnect:

    def __init__(self, name, uri, connections, batchsize, batchdelay, retrydelay):
        self.name = name
        self.uri = uri
        self.batchsize = batchsize
        self.batchdelay = batchdelay
        self.retrydelay = retrydelay
        self.failed = 0
        self.lost = 0
        self.targetCnt = connections
        self.currentCnt = 0
        self.actual = 0

    def run(self):
        self.d = Deferred()
        self.started = time.clock()
        self.connectBunch()
        return self.d

    def onFailed(self):
        self.failed += 1
        return True

    def onLost(self):
        self.lost += 1
        return True

    def onConnected(self):
        #sprint "connect"
        self.actual += 1
        if self.actual % self.batchsize == 0:
            sys.stdout.write(".")
        if self.actual == self.targetCnt:
            self.ended = time.clock()
            duration = self.ended - self.started
            print " connected %d clients to %s at %s in %s seconds (retries %d = failed %d + lost %d)" % (self.currentCnt, self.name, self.uri, duration, self.failed + self.lost, self.failed, self.lost)
            try:
                reactor.run()
            except twisted.internet.error.ReactorAlreadyRunning:
                pass

            result = {'name': self.name,
                  'uri': self.uri,
                  'connections': self.targetCnt,
                  'retries': self.failed + self.lost,
                  'lost': self.lost,
                  'failed': self.failed,
                  'duration': duration}
            self.d.callback(result)

    def connectBunch(self):

        if self.currentCnt + self.batchsize < self.targetCnt:
            c = self.batchsize
            redo = True
        else:
            c = self.targetCnt - self.currentCnt
            redo = False
        for i in xrange(0, c):
            factory = MassConnectFactory(self.uri, origin=None, protocols=[("myprotocol")])
            factory.test = self
            factory.retrydelay = self.retrydelay
            factory.protocol = MassConnectProtocol
            factory.setProtocolOptions(version=13)
            factory.setSessionParameters(url="myurl", origin=None, protocols=["myprotocol"])
            connectWS(factory)
            self.currentCnt += 1
        if redo:
            reactor.callLater(float(self.batchdelay)/1000., self.connectBunch)

class MassConnectTest:

    def __init__(self, spec):
        global f
        f = open("C:/tokens.txt", "r")
        self.spec = spec

    @inlineCallbacks
    def run(self):
        global f
        res = []
        for s in self.spec['servers']:
            t = MassConnect(s['name'],
                            s['uri'],
                            self.spec['options']['connections'],
                            self.spec['options']['batchsize'],
                            self.spec['options']['batchdelay'],
                            self.spec['options']['retrydelay'])
            r = yield t.run()
            res.append(r)
        returnValue(res)
        f.close()

在脚本中,我用虚拟字符串替换了机密数据。

从 token.txt 文件中,我正在读取我通过 web-socket 连接到被测应用程序的 15000 个用户中的每个用户的身份验证令牌。 通过这种方式,我模拟了许多不同用户的连接,并且对于每个用户,我都有活动的独立 Web 套接字连接。

不幸的是,我没有达到建立所有 15000 个连接的地步,因为在运行脚本(建立连接)时,我收到了未处理的错误消息。

【问题讨论】:

  • 请注明massconnect.py的出处。
  • @Glyph,很抱歉打扰你,但你想看看我的question吗?

标签: python-2.7 twisted autobahn


【解决方案1】:

如果您将massconnect.py 减少为SSCCE,将会很有帮助。但是,如果问题仅在数千个并发连接的级别出现,您可能会用完文件描述符(抱歉,Twisted 的错误消息不是很好)。

有关在操作系统中更改此参数的一些信息,请参阅this question

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2011-05-19
    • 1970-01-01
    • 1970-01-01
    • 2019-08-16
    • 2015-01-28
    • 2017-02-16
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多