【问题标题】:Why does flask_socketio.disconnect prevent the thread from completing?为什么 flask_socketio.disconnect 会阻止线程完成?
【发布时间】:2020-02-12 08:56:56
【问题描述】:

我有以下系统:[Client] - [Web Server] - [Connecotr]

连接器是网络服务器和数据源之间的一种中间代码。

我需要监控服务器与连接器的连接。如果连接丢失,那么我必须通知客户端。

Web 服务器和连接器之间的通信是使用 socketio 组织的。

问题在于,如果连接器停止工作,那么 Web 服务器将在一分钟后知道它(这是最好的情况)。

我决定服务器应该每秒检查一次连接器的状态。

当连接器连接到服务器时,后台任务会启动。任务的本质:每一秒:1)固定时间; 2)将固定时间保存到堆栈中; 3) 将回显消息发送到连接器。 (参见 server.background_thread)

连接器接受回显消息和时间戳作为参数并将回显消息发送到 Web 服务器,作为传递它接收的时间戳的参数。 (见client.echo)

Web 服务器收到回显消息,如果时间戳等于堆栈中的最后一个值,则从堆栈中删除该值。 (见 server.on_echo_connector)

在 Web 服务器上,每次迭代都会检查堆栈大小(请参阅 server.background_thread)。如果大于5,则表示连接器5次没有响应回显消息,我们认为连接器不可用,断开连接。

当服务器意识到连接器不可用时,需要终止向连接器发送回显消息的线程。

一旦堆栈大小大于 5,我退出无限循环并调用flask_socketio.disconnect (connector_sid, '/ connector')。在这个电话之后没有任何效果(例如print

on_disconnect_connector(服务器)方法中,thread.join() 被调用并且永不终止。

而且我需要完成线程,这样当连接器再次启动时,它会连接成功,一切重新开始。

如何解决这个问题?

服务器

# -*- coding: utf-8 -*-

import os
import threading
import time
import collections
from datetime import datetime

import flask
import flask_socketio

def get_unix_time():
    return int(time.mktime(datetime.now().timetuple()))

class Stack(collections.deque):

    def __init__(self, iterable=(), maxlen=None):
        collections.deque.__init__(self, iterable, maxlen)

    @property
    def size(self):
        return len(self)

    @property
    def empty(self):
        return self.size == 0

    @property
    def head(self):
        return self[-1]

    @property
    def tail(self):
        return self[0]

    def push(self, x):
        self.append(x)

# SERVER

app = flask.Flask(__name__)
sio = flask_socketio.SocketIO(app, async_mode='gevent')

connector_sid = None
echo_stack = Stack()

thread = None
thread_lock = threading.Lock()


def background_thread(app):
    time.sleep(2)  # delay for normal connection

    while True:
        if echo_stack.size >= 5:
            break
        time_ = get_unix_time()
        echo_stack.push(time_)
        sio.emit('echo', time_, namespace='/connector')
        sio.sleep(1)

    with app.app_context():
        flask_socketio.disconnect(connector_sid, '/connector')


@sio.on('connect', namespace='/connector')
def on_connect_connector():
    """Connector connection event handler."""
    global connector_sid, thread
    print 'Attempt to connect a connector {}...'.format(request.sid)

    # if the connector is already connected, reject the connection
    if connector_sid is not None:
        print 'Connection for connector {} rejected'.format(request.sid)
        return False
        # raise flask_socketio.ConnectionRefusedError('Connector already connected')

    connector_sid = request.sid
    print('Connector {} connected'.format(request.sid))

    with thread_lock:
        if thread is None:
            thread = sio.start_background_task(
                background_thread, current_app._get_current_object())

    # notify clients about connecting a connector
    sio.emit('set_connector_status', True, namespace='/client')


@sio.on('disconnect', namespace='/connector')
def on_disconnect_connector():
    """Connector disconnect event handler."""
    global connector_sid, thread

    print 'start join'
    thread.join()
    print 'end join'
    thread = None
    print 'after disconet:', thread

    connector_sid = None

    echo_stack.clear()

    print('Connector {} disconnect'.format(request.sid))

    # notify clients of disconnected connector
    sio.emit('set_connector_status', False, namespace='/client')


@sio.on('echo', namespace='/connector')
def on_echo_connector(time_):
    if not echo_stack.empty:
        if echo_stack.head == time_:
            echo_stack.pop()


@sio.on('message', namespace='/connector')
def on_message_connector(cnt):
    # print 'Msg: {}'.format(cnt)
    pass

if __name__ == '__main__':
    sio.run(app)

客户

# -*- coding: utf-8 -*-

import sys
import threading
import time

import socketio
import socketio.exceptions

sio = socketio.Client()
thread = None
thread_lock = threading.Lock()
work = False


def background_thread():
    # example task
    cnt = 0
    while work:
        cnt += 1
        if cnt % 10 == 0:
            sio.emit('message', cnt // 10, namespace='/connector')
        sio.sleep(0.1)


@sio.on('connect', namespace='/connector')
def on_connect():
    """Server connection event handler."""
    global thread, work

    print '\n-----            Connected to server            -----' \
          '\n----- My SID:  {} -----\n'.format(sio.sid)

    work = True  # set flag

    # run test task
    with thread_lock:
        if thread is None:
            thread = sio.start_background_task(background_thread)


@sio.on('disconnect', namespace='/connector')
def on_disconnect():
    """Server disconnect event handler."""
    global thread, work

    # clear the work flag so that at the next iteration the endless loop ends
    work = False
    thread.join()
    thread = None

    # disconnect from server
    sio.disconnect()
    print '\n-----         Disconnected from server          -----\n'

    # switch to the mode of infinite attempts to connect to the server
    main()


@sio.on('echo', namespace='/connector')
def on_echo(time_):
    sio.emit('echo', time_, namespace='/connector')


def main():
    while True:
        try:
            sio.connect('http://localhost:5000/connector',
                        namespaces=['/connector'])
            sio.wait()
        except socketio.exceptions.ConnectionError:
            print 'Trying to connect to the server...'
            time.sleep(1)
        except KeyboardInterrupt:
            print '\n---------- EXIT ---------\n'
            sys.exit()
        except Exception as e:
            print e


if __name__ == '__main__':
    print '\n---------- START CLIENT ----------\n'
    main()

Python 2.7

【问题讨论】:

  • 为什么需要一分钟才能检测到断开连接?这仅适用于 HTTP 连接,但当连接在 WebSocket 上时,会立即检测到断开连接。您是否验证过您的客户端是否通过 WebSocket 连接?
  • @Miguel,不,您是在为客户谈论“connection_transports”吗?
  • @Miguel,connection_transports == 无。我将客户端连接更新为 sio.connect('http://localhost:5000/connector', namespaces=['/connector'], transports = ['websocket'])。但是现在客户端甚至没有连接到服务器。每次连接后“sio.connected == True”,但是服务端看不到客户端,客户端无限循环连接。
  • 不,我要问的是您的服务器是否支持通过 WebSocket 的连接。如果你使用 Flask 网络服务器,不支持 WebSocket,你需要使用 eventlet 或 gevent 网络服务器。
  • @Miguel,是的,我已经安装了 gevent 和 gevent-websocket 包。问题是客户端没有切换到 websocket 传输。我找到了解决这个问题的方法。谢谢。

标签: python flask-socketio gevent-socketio python-socketio


【解决方案1】:

需要为客户端安装额外的库 (see)

pip install "python-socketio[client]"

感谢这个库,WebSocket 传输工作。现在断开连接器立即可见。

【讨论】:

    猜你喜欢
    • 2020-01-16
    • 2021-05-05
    • 1970-01-01
    • 2020-08-23
    • 2023-03-06
    • 2016-07-13
    • 1970-01-01
    • 1970-01-01
    • 2021-08-29
    相关资源
    最近更新 更多