【问题标题】:Should a TCP client be able to pause the server, when the TCP server reads a non-blocking socket当 TCP 服务器读取非阻塞套接字时,TCP 客户端是否应该能够暂停服务器
【发布时间】:2020-11-26 10:43:23
【问题描述】:

概述

我有一个简单的问题,代码如下。希望我没有在代码中犯错误。

我是一名网络工程师,我需要在网络中断期间测试我们的业务应用程序 keepalives 的某些 linux 行为(稍后我将插入一些 iptables 的东西来连接连接 - 首先我想确保我让客户端和服务器正确)。

作为我正在进行的网络故障测试的一部分,我编写了一个非阻塞 Python TCP 客户端和服务器,它们应该在循环中盲目地相互发送消息。为了了解发生了什么,我正在使用循环计数器。

服务器的循环应该相对简单。我循环遍历select 说准备好的每个 fd。我什至从未在我的服务器代码中的任何地方导入sleep。从这个角度来看,我不希望服务器的代码在它通过客户端的套接字 循环时暂停,但由于某种原因,服务器代码会间歇性地暂停(更多详细信息,请参见下文)。

我最初并没有在客户端的循环中设置睡眠。如果客户端没有睡眠,服务器和客户端似乎就像我想要的一样高效。但是,当我在客户端向服务器执行fd.send() 之后添加time.sleep(1) 语句时,TCP 服务器代码间歇地在客户端睡眠时暂停。 p>

我的问题:

  • 我应该能够编写一个单线程 Python TCP 服务器,当客户端在客户端的 fd.send() 循环中点击 time.sleep() 时不会暂停?如果是这样,我做错了什么?
  • 如果我正确编写了此测试代码并且服务器不应暂停,为什么 TCP 服务器在轮询客户端连接以获取数据时间歇性暂停?李>

重现场景

我在两台 RHEL6 linux 机器上运行它。重现问题...

  • 打开两个不同的终端。
  • 将客户端和服务器脚本保存在不同的文件中
  • 将 shebang 路径更改为本地 python(我使用的是 Python 2.7.15)
  • 将客户端代码中的 SERVER_HOSTNAMESERVER_DOMAIN 更改为您正在运行它的服务器的主机名和域
  • 先启动服务器,再启动客户端。

客户端连接后,您会看到如图 1 所示的消息在服务器终端快速滚动。 几秒钟后当客户端点击time.sleep()时,滚动暂停间歇性。我不希望看到这些停顿,但也许我误解了一些东西。

展览 1

---
LOOP_COUNT 0
---
LOOP_COUNT 1
---
LOOP_COUNT 2
---
LOOP_COUNT 3
CLIENTMSG: 'client->server 0'
---
LOOP_COUNT 4
---
LOOP_COUNT 5
---
LOOP_COUNT 6
---
LOOP_COUNT 7
---
LOOP_COUNT 8
---
LOOP_COUNT 9
---
LOOP_COUNT 10
---
LOOP_COUNT 11
---

摘要分辨率

如果我正确编写了这段测试代码并且服务器不应该暂停,为什么 TCP 服务器在轮询客户端连接以获取数据时会间歇性暂停?

回答我自己的问题。我的阻塞问题是由非零超时调用 select() 引起的。

当我将 select() 更改为使用零秒超时时,我得到了预期的结果。

最终的非阻塞代码(在答案中包含建议):

tcp_server.py

#!/usr/bin/python -u
from socket import AF_INET, SOCK_STREAM, SO_REUSEADDR, SOL_SOCKET
from socket import MSG_DONTWAIT
#from socket import MSG_OOB  <--- for send()
from socket import socket
import socket as socket_module
import select
import errno
import fcntl
import time
import sys
import os

def get_errno_info(e, op='', debugmsg=False):
    """Return verbose information from errno errors, such as errors returned by python socket()"""
    VALID_OP = set(['accept', 'connect', 'send', 'recv', 'read', 'write'])
    assert op.lower() in VALID_OP, "op must be: {0}".format(
        ','.join(sorted(VALID_OP)))

    ## ref: man 3 errno (in linux)... other systems may be man 2 intro
    ##   also see https://docs.python.org/2/library/errno.html
    try:
        retval_int = int(e.args[0])         # Example: 32
        retval_str = os.strerror(e.args[0]) # Example: 'Broken pipe'
        retval_code = errno.errorcode.get(retval_int, 'MODULEFAIL') # Ex: EPIPE
    except:
        ## I don't expect to get here unless something broke in python errno...
        retval_int  = -1
        retval_str  = '__somethingswrong__'
        retval_code = 'BADFAIL'

    if debugmsg:
        print "DEBUG: Can't {0}() on socket (errno:{1}, code:{2} / {3})".format(
            op, retval_int, retval_code, retval_str)
    return retval_int, retval_str, retval_code


host = ''
port = 6667     # IRC service
DEBUG = True

serv_sock = socket(AF_INET, SOCK_STREAM)
serv_sock.setsockopt(SOL_SOCKET, SOCK_STREAM, 1)
serv_sock.bind((host, port))
serv_sock.listen(5)

#fcntl.fcntl(serv_sock, fcntl.F_SETFL, os.O_NONBLOCK)  # Make the socket non-blocking
serv_sock.setblocking(False)

sock_list = [serv_sock]

from_client_str = '__DEFAULT__'

to_client_idx = 0
loop_count = 0
need_send_select = False
while True:
    if need_send_select:
        # Only do this after send() EAGAIN or EWOULDBLOCK...
        send_sock_list = sock_list
    else:
        send_sock_list = []

    #print "---"
    #print "LOOP_COUNT",  loop_count

    recv_ready_list, send_ready_list, exception_ready = select.select(
        sock_list, send_sock_list, [], 0.0)  # Last float is the select() timeout...


    ## Read all sockets which are output-ready... might be client or server...
    for sock_fd in recv_ready_list:

        # accept() if we're reading on the server socket...
        if sock_fd is serv_sock:
            try:
                clientsock, clientaddr = sock_fd.accept()
            except socket_module.error, e:
                errstr, errint, errcode = get_errno_info(e, op='accept',
                    debugmsg=DEBUG)

            assert sock_fd.gettimeout()==0.0, "client socket should be in non-blocking mode"
            sock_list.append(clientsock)

        # read input from the client socket...
        else:
            try:
                from_client_str = sock_fd.recv(1024, MSG_DONTWAIT)
                if from_client_str=='':
                    # Client closed the socket...
                    print "CLIENT CLOSED SOCKET"
                    sock_list.remove(sock_fd)
            except socket_module.error, e:
                errstr, errint, errcode = get_errno_info(e, op='recv',
                    debugmsg=DEBUG)
                if errcode=='EAGAIN' or errcode=='EWOULDBLOCK':
                    # socket unavailable to read()
                    continue
                elif errcode=='ECONNRESET' or errcode=='EPIPE':
                    # Client closed the socket...
                    sock_list.remove(sock_fd)
                else:
                    print "UNHANDLED SOCKET ERROR", errcode, errint, errstr
                    sys.exit(1)


            print "from_client_str: '{0}'".format(from_client_str)

    ## Adding dynamic_list, per input from EJP, below...
    if need_send_select is False:
        dynamic_list = sock_list
    else:
        dynamic_list = send_ready_list
    ## NOTE:  socket code shouldn't walk this list unless a write is pending...
    ##      broadast the same message to all clients...
    for sock_fd in dynamic_list:

        ## Ignore server's listening socket...
        if sock_fd is serv_sock:
            ## Only send() to accept()ed sockets...
            continue

        try:

            to_client_str = "server->client: {0}\n".format(to_client_idx)
            send_retval = sock_fd.send(to_client_str, MSG_DONTWAIT)
            ## send() returns the number of bytes written, on success
            ##     disabling assert check on sent bytes while using MSG_DONTWAIT
            #assert send_retval==len(to_client_str)

            to_client_idx += 1
            need_send_select = False
        except socket_module.error, e:
            errstr, errint, errcode = get_errno_info(e, op='send',
                debugmsg=DEBUG)
            if errcode=='EAGAIN' or errcode=='EWOULDBLOCK':
                need_send_select = True
                continue
            elif errcode=='ECONNRESET' or errcode=='EPIPE':
                # Client closed the socket...
                sock_list.remove(sock_fd)
            else:
                print "FATAL UNHANDLED SOCKET ERROR", errcode, errint, errstr
                sys.exit(1)

    loop_count += 1

tcp_client.py

#!/usr/bin/python -u
from socket import AF_INET, SOCK_STREAM
from socket import MSG_DONTWAIT    # non-blocking send/recv; see man 2 recv
from socket import gethostname, socket
import socket as socket_module
import select
import fcntl
import errno
import time
import sys
import os

## NOTE: Using this script to simulate a scheduler
SERVER_HOSTNAME = 'myServerHostname'
SERVER_DOMAIN = 'mydomain.local'
PORT = 6667
DEBUG = True

def get_errno_info(e, op='', debugmsg=False):
    """Return verbose information from errno errors, such as errors returned by python socket()"""
    VALID_OP = set(['accept', 'connect', 'send', 'recv', 'read', 'write'])
    assert op.lower() in VALID_OP, "op must be: {0}".format(
        ','.join(sorted(VALID_OP)))

    ## ref: man 3 errno (in linux)... other systems may be man 2 intro
    ##   also see https://docs.python.org/2/library/errno.html
    try:
        retval_int = int(e.args[0])         # Example: 32
        retval_str = os.strerror(e.args[0]) # Example: 'Broken pipe'
        retval_code = errno.errorcode.get(retval_int, 'MODULEFAIL') # Ex: EPIPE
    except:
        ## I don't expect to get here unless something broke in python errno...
        retval_int  = -1
        retval_str  = '__somethingswrong__'
        retval_code = 'BADFAIL'

    if debugmsg:
        print "DEBUG: Can't {0}() on socket (errno:{1}, code:{2} / {3})".format(
            op, retval_int, retval_code, retval_str)
    return retval_int, retval_str, retval_code


connect_finished = False
while not connect_finished:
    try:
        c2s = socket(AF_INET, SOCK_STREAM) # Client to server socket...
        # Set socket non-blocking
        #fcntl.fcntl(c2s, fcntl.F_SETFL, os.O_NONBLOCK)
        c2s.connect(('.'.join((SERVER_HOSTNAME, SERVER_DOMAIN,)), PORT))
        c2s.setblocking(False)
        assert c2s.gettimeout()==0.0, "c2s socket should be in non-blocking mode"
        connect_finished = True
    except socket_module.error, e:
        errstr, errint, errcode = get_errno_info(e, op='connect',
            debugmsg=DEBUG)
        if errcode=='EINPROGRESS':
            pass

to_srv_idx = 0
need_send_select = False
while True:
    socket_list = [c2s]

    # Get the list sockets which can: take input, output, etc...
    if need_send_select:
        # Only do this after send() EAGAIN or EWOULDBLOCK...
        send_sock_list = socket_list
    else:
        send_sock_list = []
    recv_ready_list, send_ready_list, exception_ready = select.select(
        socket_list, send_sock_list, [])

    for sock_fd in recv_ready_list:
        assert sock_fd is c2s, "Strange socket failure here"

        #incoming message from remote server
        try:
            from_srv_str = sock_fd.recv(1024, MSG_DONTWAIT)
        except socket_module.error, e:
            ## https://stackoverflow.com/a/16745561/667301
            errstr, errint, errcode = get_errno_info(e, op='recv',
                debugmsg=DEBUG)
            if errcode=='EAGAIN' or errcode=='EWOULDBLOCK':
                # Busy, try again later...
                print "recv() BLOCKED"
                continue
            elif errcode=='ECONNRESET' or errcode=='EPIPE':
                # Server ended normally...
                sys.exit(0)

        ## NOTE: if we get this far, we successfully received from_srv_str.
        ##    Anything caught above, is some kind of fail...
        print "from_srv_str: {0}".format(from_srv_str)

    ## Adding dynamic_list, per input from EJP, below...
    if need_send_select is False:
        dynamic_list = socket_list
    else:
        dynamic_list = send_ready_list
    for sock_fd in dynamic_list:
        # outgoing message to remote server
        if sock_fd is c2s:
            try:
                to_srv_str = 'client->server {0}'.format(to_srv_idx)
                sock_fd.send(to_srv_str, MSG_DONTWAIT)

                               ##
                time.sleep(1)  ## Client blocks the server here... Why????
                               ##

                to_srv_idx += 1
                need_send_select = False
            except socket_module.error, e:
                errstr, errint, errcode = get_errno_info(e, op='send',
                    debugmsg=DEBUG)
                if errcode=='EAGAIN' or errcode=='EWOULDBLOCK':
                    ## Try to send() later...
                    print "send() BLOCKED"
                    need_send_select = True
                    continue
                elif errcode=='ECONNRESET' or errcode=='EPIPE':
                    # Server ended normally...
                    sys.exit(0)

原问题代码:

tcp_server.py

#!/usr/bin/python -u
from socket import AF_INET, SOCK_STREAM, SO_REUSEADDR, SOL_SOCKET
#from socket import MSG_OOB  <--- for send()
from socket import socket
import socket as socket_module
import select
import fcntl
import os

host = ''
port = 9997

serv_sock = socket(AF_INET, SOCK_STREAM)
serv_sock.setsockopt(SOL_SOCKET, SOCK_STREAM, 1)
serv_sock.bind((host, port))
serv_sock.listen(5)

fcntl.fcntl(serv_sock, fcntl.F_SETFL, os.O_NONBLOCK)  # Make the socket non-blocking

sock_list = [serv_sock]

from_client_str = '__DEFAULT__'

to_client_idx = 0
loop_count = 0
while True:
    recv_ready_list, send_ready_list, exception_ready = select.select(sock_list, sock_list,
        [], 5)

    print "---"
    print "LOOP_COUNT",  loop_count

    ## Read all sockets which are input-ready... might be client or server...
    for sock_fd in recv_ready_list:

        # accept() if we're reading on the server socket...
        if sock_fd is serv_sock:
            clientsock, clientaddr = sock_fd.accept()
            sock_list.append(clientsock)

        # read input from the client socket...
        else:
            try:
                from_client_str = sock_fd.recv(4096)
                if from_client_str=='':
                    # Client closed the socket...
                    print "CLIENT CLOSED SOCKET"
                    sock_list.remove(sock_fd)
            except socket_module.error, e:
                print "WARNING RECV FAIL"


            print "from_client_str: '{0}'".format(from_client_str)

    for sock_fd in send_ready_list:
        if sock_fd is not serv_sock:
            try:
                to_client_str = "server->client: {0}\n".format(to_client_idx)
                sock_fd.send(to_client_str)
                to_client_idx += 1
            except socket_module.error, e:
                print "TO CLIENT SEND ERROR", e

    loop_count += 1

tcp_client.py

#!/usr/bin/python -u
    
from socket import AF_INET, SOCK_STREAM
from socket import gethostname, socket
import socket as socket_module
import select
import fcntl
import errno
import time
import sys
import os

## NOTE: Using this script to simulate a scheduler
SERVER_HOSTNAME = 'myHostname'
SERVER_DOMAIN = 'mydomain.local'
PORT = 9997

def handle_socket_error_continue(e):
    ## non-blocking socket info from:
    ## https://stackoverflow.com/a/16745561/667301
    print "HANDLE_SOCKET_ERROR_CONTINUE"
    err = e.args[0]
    if (err==errno.EAGAIN) or (err==errno.EWOULDBLOCK):
        print 'CLIENT DEBUG: No data input from server'
        return True
    else:
        print 'FROM SERVER RECV ERROR: {0}'.format(e)
        sys.exit(1)

c2s = socket(AF_INET, SOCK_STREAM) # Client to server socket...
c2s.connect(('.'.join((SERVER_HOSTNAME, SERVER_DOMAIN,)), PORT))
# Set socket non-blocking...
fcntl.fcntl(c2s, fcntl.F_SETFL, os.O_NONBLOCK)

to_srv_idx = 0
while True:
    socket_list = [c2s]

    # Get the list sockets which can: take input, output, etc...
    recv_ready_list, send_ready_list, exception_ready = select.select(
        socket_list, socket_list, [])

    for sock_fd in recv_ready_list:
        assert sock_fd is c2s, "Strange socket failure here"

        #incoming message from remote server
        try:
            from_srv_str = sock_fd.recv(4096)
        except socket_module.error, e:
            ## https://stackoverflow.com/a/16745561/667301
            err_continue = handle_socket_error_continue(e)
            if err_continue is True:
                continue
        else:
            if len(from_srv_str)==0:
                print "SERVER CLOSED NORMALLY"
                sys.exit(0)

        ## NOTE: if we get this far, we successfully received from_srv_str.
        ##    Anything caught above, is some kind of fail...
        print "from_srv_str: {0}".format(from_srv_str)

    for sock_fd in send_ready_list:
        #incoming message from remote server
        if sock_fd is c2s:
            #to_srv_str = raw_input('Send to server: ')
            try:
                to_srv_str = 'client->server {0}'.format(to_srv_idx)
                sock_fd.send(to_srv_str)

                               ##
                time.sleep(1)  ## Client blocks the server here... Why????
                               ##

                to_srv_idx += 1
            except socket_module.error, e:
                print "TO SERVER SEND ERROR", e

【问题讨论】:

    标签: python sockets tcp nonblocking


    【解决方案1】:

    TCP 套接字几乎总是准备好写入,除非它们的套接字发送缓冲区已满。

    因此,始终为套接字选择可写性是不正确的。您应该只在遇到由于 EAGAIN/EWOULDBLOCK 导致的发送失败后才这样做。否则,您的服务器将无意识地处理可写套接字,这通常是所有这些。

    【讨论】:

    • 我明白你在说什么。您是否暗示这是因为选择循环中的客户端send() 而发生的?另外,如果我不在send() 端使用select(),我很好奇send() 到TCP 零窗口的情况。 TCP 零窗口是否会发生 EAGAIN/EWOULDBLOCK?
    • @MikePennington 与客户无关。服务器在不知不觉中旋转。如果接收窗口关闭,发送缓冲区将填满,当它填满时,您将获得 EAGAIN/EWOULDBLOCK。
    • @EJP 当我测试它时,服务器实际上没有旋转 CPU,因为客户端套接字的传出数据缓冲区几乎总是 100% 满,因此客户端套接字并没有经常选择准备写入。实际上,服务器端的错误(总是盲目地选择准备写入)被客户端的“错误”(经常睡觉并且没有及时从客户端套接字读取)掩盖。
    【解决方案2】:

    但是,当我在客户端执行 time.sleep(1) 语句后 fd.send() 到服务器,TCP服务器代码间歇性暂停 当客户睡觉时。

    AFAICT 从运行提供的代码(很好的独立示例,顺便说一句),服务器按预期运行。

    特别是,select() 调用的语义是 select() 不应该返回,直到线程有事情要做。将线程块放在select() 中是一件好事,因为此时线程无论如何都无法执行任何操作,因为它会阻止线程无缘无故地旋转 CPU。

    所以在这种情况下,您的服务器程序告诉select(),它希望select() 仅在至少满足以下条件之一时返回:

    1. serv_sock 准备好读取(也就是说,一个新的客户端现在要连接到服务器)
    2. serv_sock 已准备好写入(我不相信这实际上发生在侦听套接字上,因此可以忽略此标准)
    3. clientsock 已准备好读取(即客户端已向服务器发送了一些字节,它们正在 clientsock 的缓冲区中等待服务器线程到 recv() 它们)
    4. clientsock 已准备好写入(即,clientsock 在其传出数据缓冲区中有一些空间,如果服务器想要将一些数据发送回客户端,则可以将send() 数据放入其中)
    5. 自对 select() 的调用开始阻塞以来已经过去了 5 秒。

    我看到(通过打印调试)当您的服务器程序阻塞时,它在 select() 内部阻塞,这表明在阻塞期间以上 5 个条件都没有得到满足。

    这是为什么呢?好吧,让我们来看看列表。

    1. 未满足,因为没有其他客户端尝试连接
    2. 未满足,因为这从未发生过
    3. 未满足,因为服务器已读取连接的客户端已发送的所有数据(并且由于连接的客户端本身处于休眠状态,因此不再发送任何数据)
    4. 未满足,因为服务器已填满其clientsock 的传出数据缓冲区(因为客户端程序正在休眠,它只是间歇性地读取来自服务器的数据,TCP 层保证无损/有序传输,因此一旦clientsock 的传出数据缓冲区已满,clientsock 将不会选择写入准备就绪,除非/直到客户端从其连接末尾读取至少一些数据)
    5. 未满足,因为自 select() 开始阻止以来尚未过去 5 秒。

    那么这种行为实际上是服务器的问题吗?事实上它不是,因为服务器仍然会响应连接到服务器的任何其他客户端。特别是,只要serv_sock 或任何其他客户端的套接字select()s 准备好读取(或准备好写入),select() 仍会立即返回,因此服务器可以很好地处理其他客户端,同时等待你被黑/慢的客户端唤醒。

    被黑/速度慢的客户端可能是用户的问题,但服务器对此无能为力(除了强制断开客户端的 TCP 连接,或者打印出日志消息请求某人调试连接的客户端程序,我想:))。

    我同意 EJP 的观点,顺便说一句——只应在您真正想要写入一些数据的套接字上进行选择准备写入。如果您实际上并不想尽快写入套接字,那么指示select() 在该套接字准备好写入后立即返回是没有意义且适得其反的:这样做的问题是您每当任何套接字的传出数据缓冲区未满时(在大多数应用程序中,大部分时间都是如此!),都可能使 CPU 旋转 很多。该问题的用户可见症状是您的服务器程序正在使用 100% 的 CPU 内核,即使它本应空闲或大部分空闲。

    【讨论】:

    • 我突然想到sleep() 语句可能会导致客户端套接字无法写入,因为在sleep() 期间缓冲区已满?我想在这个问题上慢一会儿,但你说的很有道理。
    • 是的,你的假设是正确的。 (如果你想打个比方,可以把 TCP 连接想象成一对塑料管——一个管子用于向每个方向传输数据。在正常情况下,服务器将弹珠推入管子的末端,客户端弹出弹珠从另一端出来;但是当客户端调用sleep()时,它一端的弹珠不会经常弹出,最终管子会填满,这样服务器就不能再推入弹珠了,直到下一个客户醒来并取出一些的时间)
    • 好的,但如果是这种情况,我希望服务器端 send() 在客户端休眠时获得 EAGAIN/EWOULDBLOCK,但这没有发生(我编辑了我的测试以包含更多 @987654348 @ 处理,所以每当我遇到套接字异常时都应该有明确的警告)。出于某种原因,即使进行了所有更改(包括 EJP 建议的更改),当客户端暂停时,服务器仍然暂停;尽管send() 缓冲区已满,服务器看起来并没有暂停。关于我最近一次尝试中暂停它的原因有什么想法吗?
    • send() 将返回 -1 并将 errno 设置为 EWOULDBLOCK 如果您在输出缓冲区已满的(非阻塞)套接字上调用 send()。但是,您的服务器代码从不这样做;相反,它(正确地)仅在套接字选择为准备写入后调用send(),并且套接字仅在其传出数据缓冲区中有空间时才选择准备写入。
    • 我发现了问题...服务器的循环在select() 上阻塞,因为我用 5 秒超时调用它。感谢您的有用意见
    猜你喜欢
    • 2016-03-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-12-16
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多