【问题标题】:python sockets multiple messages on same connectionpython在同一个连接上连接多条消息
【发布时间】:2017-07-02 12:32:55
【问题描述】:

我的问题更笼统而不是具体。我想实现一个简单的客户端服务器应用程序,只是为了将 消息 从客户端传递到服务器并从服务器获得确认。

我想知道在使用套接字时我必须考虑什么,我是否必须实现自己的通信接口并管理同一连接上的消息传递或为每条消息创建一个新连接?

(请假设现在的消息小于 BUFFER_SIZE)

代码是这样的:

server.py

server_info = (HOST, PORT)
sock = socket.socket(family=AF_INET, type=SOCK_STREAM)
sock.bind(server_info)
sock.listen(NUMBER_OF_SOCKETS)
try:
    while True:
        connection, client_address = sock.accept()
        try:
            while True:
                data = connection.recv(BUFFER_SIZE)
                print('message received: {data}'.format(data=data))
                connection.send("ok")
        finally:
            connection.close()

client.py

server_info = (HOST, PORT)
sock = socket.socket(family=AF_INET, type=SOCK_STREAM)
sock.connect(server_info)
try:
    print("connection established")
    while True:
        print("Please enter a message you want to pass to the server")
        msg = raw_input()

        print('sending "{message}"'.format(message=msg))
        sock.send(msg)

        while True:
            data = sock.recv(constants.BUFFER_SIZE)
            print('received "{data}"'.format(data=data))
            break

finally:
    print('closing socket')
    sock.close()

此代码使我能够在服务器端接收多条消息并从客户端发送多条消息。这是正确的方法吗?为了做到这一点,我必须在客户端进行 2 个无限循环,那么关闭连接呢?当我发送一条 0 字节的消息时,服务器和客户端都卡住了。

非常感谢!

【问题讨论】:

  • 您通常不会实现自己的客户端-服务器协议,而是使用 Web 服务器。例如Apache httpd with WSGI.
  • 我并不真正想要一个 Web 服务器,我只想在 2 个主机之间进行通信,发送和接收消息。不过,我不是在寻找替代品。
  • Web 服务器的用途远不止网页。后台消息传递是通过让 Web 服务器向请求提供 JSON 或 XML 回复来完成的。
  • 我了解先生,但我仍然想得到上述问题的答案,而不是替代方案。谢谢

标签: python sockets


【解决方案1】:

在双向通信中,默认情况下,客户端可以知道何时完成发送,但无法知道是否完成接收。而且,服务器也无法知道客户端是否发送完毕。

代码:

def recv_end(the_socket):
    End='SERVER WRONG MARKER'
    total_data=[];data='';got_end=False
    while True:
            data=the_socket.recv(8192)
            if not data: break
            if End in data:
                total_data.append(data[:data.find(End)])
                got_end=True
                break
            total_data.append(data)
            if len(total_data)>1:
                #check if end_of_data was split
                last_pair=total_data[-2]+total_data[-1]
                if End in last_pair:
                    total_data[-2]=last_pair[:last_pair.find(End)]
                    total_data.pop()
                    got_end=True
                    break
    return (got_end,''.join(total_data))

def basic_server(sock):
    got=[]
    got_end,data = recv_end(sock)
    if not got_end:  
        sock.send('ERROR:no end!') #<--- not possible w/close()
    else: sock.sendall(data*2)
    sock.shutdown(1)
    sock.close()

import socket
Port=4444
def start_server():
    sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sock.bind(('',Port))
    sock.listen(5)
    print 'started on',Port
    while True:
        newsock,address=sock.accept()
        basic_server(newsock)

def send_data(data):
    sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sock.connect(('localhost',Port))
    print 'connected'
    sock.sendall(data+'CLIENT WRONG MARKER')
    print 'sent',data
    sock.shutdown(1)
    print 'shutdown'
    result=[]
    while True:
       got=sock.recv(2)
       if not got: break
       result.append(got)
    sock.close()
    return ''.join(result)

if __name__=='__main__':
    start_server()

您可以做一些事情,例如在数据前面放置一个字节数,或者有一个结束标记,这样服务器就可以知道它是否获得了所有字节。

但是,这会带来一个问题。如果字节数错误或结束标记永远不会到达怎么办?使用socket.close(),服务器无法告诉客户端,“奇怪。您已完成向我发送数据,但我没有收到所有数据”,因为客户端完成发送后客户端连接并未保持打开状态。

使用socket.shutdown(1) 服务器仍然可以告知客户端出现问题并采取适当的措施。

关机命令有三个选项0 = done receiving, 1 = done sending, 2 = both

在上面的代码中重点关注 1、去掉close操作中的隐含发送。请注意在 send_data 中关闭操作是如何(相对)远离关闭的。这允许服务器告诉客户端任何离别注释。

只需运行代码即可启动服务器。出于演示目的,服务器设置为一次仅接收 2 字节s(应该类似于 8192)。要将数据发送给它,请导入它(称为 shut_srv 或其他)并为客户端调用 send_data。

data=('a1234','b1234','c1234','d1234','e1234') for d in data: print shut_srv.send_data(d)

您将收到类似的回复:connected sent a1234 shutdown ERROR:no end! connected sent b1234 shutdown ERROR:no end! connected sent c1234 shutdown ERROR:no end! connected sent d1234 shutdown ERROR:no end! connected sent e1234 shutdown ERROR:no end!

如果您使标记相同。回复应该是:connected sent a123456789 shutdown a1234a1234 connected sent b1234 shutdown b1234b1234 connected sent c1234 shutdown c1234c1234 connected sent d1234 shutdown d1234d1234 connected sent e1234 shutdown e1234e1234

【讨论】:

    【解决方案2】:

    添加两种服务器-客户端,一种是多进程,另一种是异步的,它们做的几乎一样,异步的更健壮,请在此处阅读原因: Threads vs. Async.

    我的例子: 使用多进程:

    import multiprocessing
    import socket
    import time
    
    HOST = "0.0.0.0"
    PORT = 9000
    
    
    def handle(connection, address):
    
        try:
            while True:
                data = connection.recv(1024)
                connection.sendall(data + ' server time {}'.format(time.time()))
        except:
            pass
        finally:
            connection.close()
    
    
    class Server(object):
    
        def __init__(self, hostname, port):
            self.hostname = hostname
            self.port = port
    
        def start(self):
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.socket.bind((self.hostname, self.port))
            self.socket.listen(1)
    
            while True:
                conn, address = self.socket.accept()
                process = multiprocessing.Process(
                    target=handle, args=(conn, address))
                process.daemon = True
                process.start()
    
    
    if __name__ == "__main__":
        server = Server(HOST, PORT)
        try:
            print 'start'
            server.start()
        except:
            print 'something wrong happened, a keyboard break ?'
        finally:
            for process in multiprocessing.active_children():
                process.terminate()
                process.join()
        print 'Goodbye'
    

    还有它的客户:

        import sys
    import threading
    import time
    import socket
    
    SOCKET_AMOUNT = 100
    HOST = "localhost"
    PORT = 9000
    
    
    def myclient(ip, port, message):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((ip, port))
        sock.sendall(message)
        result = sock.recv(1024)
        print result + ' final clnt time {}'.format(time.time())
        sock.close()
    
    if __name__ == "__main__":
        thread_list = []
        for i in range(SOCKET_AMOUNT):
            msg = "Thread #{}, clnt time {}".format(i, time.time())
            client_thread = threading.Thread(
                target=myclient, args=(HOST, PORT, msg))
            thread_list.append(client_thread)
            client_thread.start()
    
        waiting = time.time()
        [x.join() for x in thread_list]
        done = time.time()
        print 'DONE {}. Waiting for {} seconds'.format(done, done-waiting)
    

    下一个服务器更强大!数据不会丢失!!! 服务器:

    import asyncore
    import socket
    import time
    import logging
    import json
    
    
    class Server(asyncore.dispatcher):
    
        def __init__(self, host, port):
    
            self.logger = logging.getLogger('SERVER')
            asyncore.dispatcher.__init__(self)
            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
            self.set_reuse_addr()
            self.bind(('', port))
            self.listen(confjson.get('SERVER_QUEUE_SIZE', None))
            self.logger.debug('binding to {}'.format(self.socket.getsockname()))
    
        def handle_accept(self):
            socket, address = self.accept()
            self.logger.debug('new connection accepted')
            EchoHandler(socket)
    
    
    class EchoHandler(asyncore.dispatcher_with_send):
    
        def handle_read(self):
    
            msg = self.recv(confjson.get('RATE', None))
            self.out_buffer = msg
            self.out_buffer += ' server recieve: {}'.format(time.time())
            if not self.out_buffer:
                self.close()
    
    
    if __name__ == "__main__":
    
        logging.basicConfig(level=logging.DEBUG,
                            format='%(name)s: %(message)s',
                            )
        with open('config.json', 'r') as jfile:
            confjson = json.load(jfile)
        try:
            logging.debug('Server start')
            server = Server(confjson.get('HOST', None),
                            confjson.get('PORT', None))
            asyncore.loop()
        except:
            logging.error('Something happened,\n'
                          'if it was not a keyboard break...\n'
                          'check if address taken, '
                          'or another instance is running. Exit')
        finally:
            logging.debug('Goodbye')
    

    还有异步客户端:

    import asyncore
    import socket
    import time
    import logging
    import json
    
    
    class Client(asyncore.dispatcher_with_send):
    
        def __init__(self, host, port, message, pk):
            self.logger = logging.getLogger('CLIENT')
            asyncore.dispatcher.__init__(self)
            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
            self.host = host
            self.port = port
            self.connect((host, port))
            self.out_buffer = message
            self.clientID = pk
            self.logger.debug('Connected #{}'.format(self.clientID))
    
        def handle_close(self):
            self.close()
    
        def handle_read(self):
            rec_msg = self.recv(confjson.get('RATE', None))
            self.logger.debug('#{}, {} back at client {}'.format(self.clientID,
                                                                 rec_msg,
                                                                 time.time()
                                                                 )
                              )
            self.close()
    
    
    if __name__ == "__main__":
        logging.basicConfig(level=logging.DEBUG,
                            format='%(name)s: %(message)s',
                            )
    
        with open('config.json', 'r') as jfile:
            confjson = json.load(jfile)
        clients = []
        for idx in range(confjson.get('SOCKET_AMOUNT', None)):
            msg = "Start: {}".format(time.time())
            clients.append(Client(confjson.get('HOST', None),
                                  confjson.get('PORT', None),
                                  msg,
                                  idx)
                           )
        start = time.time()
        logging.debug(
            'Starting async loop for all connections, unix time {}'.format(start))
        asyncore.loop()
        logging.debug('{}'.format(time.time() - start))
    

    还有一个小的配置文件:

    {
        "HOST": "127.0.0.1",
        "PORT": 5007,
        "RATE": 8096,
        "SERVER_QUEUE_SIZE": 16,
        "SOCKET_AMOUNT": 100
    }
    

    【讨论】:

    • 问题,异步代码的计算密集度如何?它会在像 Raspberry Pi 这样较弱的系统上运行良好吗?
    • IDK,请尝试并分享结果。这也是服务器任务的问题。
    • 所以我不得不稍微调整一下代码,因为 python 3.6.5 不允许传输字符串。现在必须对它们进行编码/解码才能发送信号。但是在将 RPi 连接到 Windows 10 python 时它工作得非常好
    • 在第一个版本(非异步)中,为什么要让服务器多进程而不是多线程,也就是每个客户端一个线程?
    • 你说异步版本更健壮,因为它不会丢失数据。第一个“服务器”模块..命名空间不是拥有子流程实例而不是 Server() 实例吗?因此操作系统会知道进程(又名 Python.exe)及其子子进程,但操作系统不会知道某些 Python Server() 实例管理每个客户端连接的“子进程”。如果句柄子进程无法启动,您如何丢失数据?有趣的是,我认为您的第一台服务器将客户端与彼此的通信隔离开来,因为每个客户端都在自己的进程中。
    【解决方案3】:

    我有同样的问题,试试这个服务器:

    import socket
    import select
    open_client_sockets = []
    server_socket = socket.socket()
    server_socket.bind(('0.0.0.0', 8001))
    
    server_socket.listen(1)
    (new_socket1, address1) = server_socket.accept()
    open_client_sockets.append(new_socket1)
    
    while True:
        rlist, wlist, xlist = select.select(open_client_sockets, open_client_sockets, [])
        for current_socket in rlist:
            data = current_socket.recv(4096)
            if data != '':
                print "given data: ", str(data)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-03-07
      • 2023-03-06
      • 2020-04-20
      • 1970-01-01
      • 2021-10-08
      相关资源
      最近更新 更多