【问题标题】:Reading more than one "message" from recv()从 recv() 读取多个“消息”
【发布时间】:2019-08-06 06:19:00
【问题描述】:

完全有可能使用 recv() 调用将多个单独的“消息”(例如 2 个 send()s)读入缓冲区。

在这种情况下,一旦您意识到缓冲区中的数据比您需要的多,您将如何将第二条消息放回 recv() 缓冲区?

例如,

所有消息都以指定长度的字节开头。我需要继续接收,直到将正确数量的字节读入缓冲区,但不要继续接收。

一个想法是执行一次 recv() 来确定消息长度,然后创建一个具有该大小的缓冲区。我不知道不适合缓冲区的数据会发生什么。

【问题讨论】:

  • 这是您必须在应用程序级别处理的问题,您需要自己进行缓冲。但是如果你有“消息”的长度,那么你只需要在一个循环中调用recv(尺寸越来越小)直到所有的消息都被接收到,这样就简单多了。
  • 我想知道这是否可行,但长度不应该是分配给缓冲区的大小吗?
  • 诸如 TCP 之类的流协议传输字节流而不考虑您的“消息”边界。
  • @19172281 您对所有答案都不感兴趣? (What should I do when someone answers my question?)

标签: c++ c sockets tcp


【解决方案1】:

如果您想要接收固定尺寸,您可以执行以下操作:

ssize_t recv_all(int socket, char *buffer_ptr, size_t bytes_to_recv)
{
    size_t original_bytes_to_recv = bytes_to_recv;

    // Continue looping while there are still bytes to receive
    while (bytes_to_recv > 0)
    {
        ssize_t ret = recv(socket, buffer_ptr, bytes_to_recv, 0);
        if (ret <= 0)
        {
            // Error or connection closed
            return ret;
        }

        // We have received ret bytes
        bytes_to_recv -= ret;  // Decrease size to receive for next iteration
        buffer_ptr += ret;     // Increase pointer to point to the next part of the buffer
    }

    return original_bytes_to_recv;  // Now all data have been received
}

简单地用作

// Somewhere above we have received the size of the data to receive...

// Our data buffer
char buffer[the_full_size_of_data];

// Receive all data
recv_all(socket, buffer, sizeof buffer);  // TODO: Add error checking

[请注意,我使用诸如ssize_tint 之类的POSIX 类型作为套接字。修改以适合您的系统(例如,SOCKET 用于 Windows 上的套接字)。]

【讨论】:

  • 我喜欢这种方法,但是长度不应该是缓冲区的大小吗?
  • @19172281 缓冲区的长度被传递给函数。然后在循环中使用 remaining 缓冲区的 reaming 长度。
【解决方案2】:

你不会“放回去”。相反,定义什么构成完整的消息并实现一个缓冲区,从套接字读取固定数量并仅提取完整的消息。

例如,下面的类将调用recv,直到找到标记字节(在本例中为换行符),然后只返回减去标记的消息(UTF-8 编码的字符串)。缓冲区中的任何剩余数据都会保存并在下一次get_msg 调用时处理:

from socket import *

class SocketBuffer:
    def __init__(self,sock):
        self.sock = sock
        self.buffer = b''

    def get_msg(self):
        # Buffer data until a newline is found.
        while b'\n' not in self.buffer:
            data = self.sock.recv(1024)
            if not data:
                return b'' # drops partial messages...should check and raise error instead
            self.buffer += data
        # split off the message bytes from the buffer.
        msg,_,self.buffer = self.buffer.partition(b'\n')
        return msg.decode()

【讨论】:

    【解决方案3】:

    一旦您意识到缓冲区中的数据比您需要的多,您将如何将第二条消息放回 recv() 缓冲区?

    只是不要将第二条消息从 recv() 缓冲区中取出,我看到了两种方法:

    1) 先做

    ssize_t size = recv(sockfd, buf, len, MSG_PEEK | MSG_TRUNC);
    
    • MSG_TRUNC(仅限 AF_PACKET)为您提供可用数据的实际大小,而不是截断为 len
    • 的可能长度
    • 使用 MSG_PEEK,接收的数据不会从队列中删除。

    这允许您分析 peek 数据和

    • 如果它是第一条消息的子部分,但不是您阅读(而不是偷看)它的结尾recv(sockfd, buf, size);,那么您重做之前的 recv 等
    • 如果您有第一条消息的(结尾)并且可能是第二条消息的一部分,那么您知道 subSize 您需要阅读并执行recv(sockfd, buf, subSize);,而您的第二条消息仍在可用于下一个 revc

    当然,每次您阅读第一条消息的子部分时,指针 buf 都会前进到不重写已阅读的部分。

    使用malloc 然后realloc 增加接收第一条消息的缓冲区大小

    2) 一种非常常见的方式,它在消息本身之前发送消息的大小,允许接收者首先读取大小,然后在循环中读取数据,直到读取所有消息。如果消息大于 255 字节,为了与 little/big endian 兼容,请使用 htons/htonl/ntohs/ntohl 作为大小

    我不知道不适合缓冲区的数据会发生什么。

    如果您说要写出缓冲区,因为行为未定义,所以没人知道,如果您有机会遇到分段错误,这与严重的内存损坏相反,其影响很晚才可见。但是正如您在上面的两个解决方案中看到的那样,幸运的是这种情况没有发生


    以第一种情况为例,使用TCP/IP(没有MSG_TRUNC那么),空格表示每个缓冲区的结束(但我不逐字符读取,以兼容更复杂的缓冲区确定结束)。

    服务器在参数中获取要发送的字符串,每个参数在一个发送中发送,无论它是否包含空格,最后一个参数的最后一个字符必须成为一个空间。

    客户端获取一个参数,即每次(尝试)读取的大小,它会打印每个“peek”缓冲区(用于调试)和每个缓冲区。

    server.c

    #include <stdio.h>
    #include <string.h>
    #include <unistd.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <errno.h>
    
    int main(int argc, char ** argv)
    {
      errno = 0;
    
      int ssock = socket(AF_INET, SOCK_STREAM, 0);
    
      if (ssock == -1) {
        perror("socket()");
        return -1;
      }
    
      int reuse = 1;
      if (setsockopt(ssock, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) == -1) {
        perror("setsockopt() SO_REUSEADDR)");
        return -1;
      }
    
    #ifdef SO_REUSEPORT
      if (setsockopt(ssock, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) == -1) {
        perror("setsockopt() SO_REUSEPORT)");
        return -1;
      }
    #endif
    
      struct sockaddr_in ssin = { 0 };
    
      ssin.sin_addr.s_addr = htonl(INADDR_ANY);
      ssin.sin_port = htons(1024);
      ssin.sin_family = AF_INET;
    
      if(bind (ssock, (struct sockaddr*) &ssin, sizeof(ssin)) == -1)
      {
        perror("bind()");
        return -1;
      }
    
      if(listen(ssock, 1) == -1)
      {
        perror("listen()");
        return -1;
      }
    
      struct sockaddr_in csin = { 0 };
      socklen_t csz = sizeof(csin);
      int csock = accept(ssock, (struct sockaddr*) &csin, &csz);
    
      if (csock == -1) {
        perror("accept()");
        return -1;
      }
    
      for (int i = 1; i < argc; ++i) {
        if (send(csock, argv[i], strlen(argv[i]), 0) == -1) {
          char s[32];
    
          sprintf(s, "send %i", i);
          perror(s);
        }
      }
    
      close(csock);
      close(ssock);
    }
    

    client.c

    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <errno.h>
    #include <string.h>
    
    int main(int argc, char ** argv)
    {
      if (argc != 2) {
        printf("Usage : %s <length>\n", *argv);
        return 0;
      }
    
      int len;
      char c;
    
      if ((sscanf(argv[1], "%d%c", &len, &c) != 1) && (len < 1)) {
        fprintf(stderr, "invalid length\n");
        return -1;
      }
    
      errno = 0;
    
      int sock = socket(AF_INET, SOCK_STREAM, 0);
    
      if (sock == -1) {
        perror("socket()");
        return -1;
      }
    
      struct sockaddr_in sin = { 0 };
    
      sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
      sin.sin_port = htons(1024);
      sin.sin_family = AF_INET;
    
      if(connect (sock, (struct sockaddr*) &sin, sizeof(sin)) == -1)
      {
        perror("connect()");
        return -1;
      }
    
      for (;;) {
        size_t ln = len;
        char * buf = malloc(ln + 1);
    
        if (buf == NULL) {
          fprintf(stderr, "cannot malloc");
          break;
        }
    
        size_t off = 0;
    
        for (;;) {
          ssize_t sz = recv(sock, buf + off, len, MSG_PEEK); /* no MSG_TRUNC : AF_INET */
    
          if (sz <= 0) {
            free(buf);
            close(sock);
            return -1;
          }
    
          buf[off + sz] = 0;
    
          /* debug */
          printf("peek '%s'\n", buf + off);
    
          char * p = strchr(buf + off, ' ');
    
          if (p != NULL) {
            recv(sock, buf + off, p - buf - off + 1, 0);
            *p = 0;
            printf("full buff is '%s'\n", buf);
            free(buf);
            break;
          }
    
          recv(sock, buf + off, sz, 0);
          off += sz;
          ln += sz;
          buf = realloc(buf, ln + 1);
    
          if (buf == NULL) {
            fprintf(stderr, "cannot malloc");
            break;
          }
        }
      }
    
      close(sock);
    }
    

    编译和执行:

    pi@raspberrypi:~ $ gcc -pedantic -Wextra server.c -o se
    pi@raspberrypi:~ $ gcc -g -pedantic -Wextra client.c -o cl
    pi@raspberrypi:~ $ ./se "123 456 78901234567" "8 1 " &
    [1] 11551
    pi@raspberrypi:~ $ ./cl 5
    peek '123 4'
    full buff is '123'
    peek '456 7'
    full buff is '456'
    peek '78901'
    peek '23456'
    peek '78 1 '
    full buff is '789012345678'
    peek '1 '
    full buff is '1'
    [1]+  Fini                    ./se "123 456 78901234567" "8 1 "
    pi@raspberrypi:~ $ 
    

    valgrind 下的执行(在单独的终端中):

    pi@raspberrypi:~ $ valgrind ./se "123 456 78901234567" "8 1 " 
    ==11602== Memcheck, a memory error detector
    ==11602== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
    ==11602== Using Valgrind-3.13.0 and LibVEX; rerun with -h for copyright info
    ==11602== Command: ./se 123\ 456\ 78901234567 8\ 1\ 
    ==11602== 
    ==11602== 
    ==11602== HEAP SUMMARY:
    ==11602==     in use at exit: 0 bytes in 0 blocks
    ==11602==   total heap usage: 0 allocs, 0 frees, 0 bytes allocated
    ==11602== 
    ==11602== All heap blocks were freed -- no leaks are possible
    ==11602== 
    ==11602== For counts of detected and suppressed errors, rerun with: -v
    ==11602== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 6 from 3)
    
    pi@raspberrypi:~ $ valgrind ./cl 5
    ==11604== Memcheck, a memory error detector
    ==11604== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
    ==11604== Using Valgrind-3.13.0 and LibVEX; rerun with -h for copyright info
    ==11604== Command: ./cl 5
    ==11604== 
    peek '123 4'
    full buff is '123'
    peek '456 7'
    full buff is '456'
    peek '78901'
    peek '23456'
    peek '78 1 '
    full buff is '789012345678'
    peek '1 '
    full buff is '1'
    ==11604== 
    ==11604== HEAP SUMMARY:
    ==11604==     in use at exit: 0 bytes in 0 blocks
    ==11604==   total heap usage: 8 allocs, 8 frees, 1,081 bytes allocated
    ==11604== 
    ==11604== All heap blocks were freed -- no leaks are possible
    ==11604== 
    ==11604== For counts of detected and suppressed errors, rerun with: -v
    ==11604== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 6 from 3)
    

    【讨论】:

    • MSG_PEEK 没有你赋予它的忽略len 参数的神秘属性,并且 TCP 中的 MSG_TRUNC 阻止数据被复制到缓冲区中。真的没多大用处。
    • @user207421 谢谢你的评论。请注意 OP 没有谈论协议,无论如何你对 MSG_TRUNC 是正确的,我添加了一个关于它的精确度。抱歉,我不明白你所说的 MSG_PEEK 是什么意思。但是,我还编辑了我的答案,以使用第一种方式(第二种方式很简单)举一个例子,包括执行。
    猜你喜欢
    • 1970-01-01
    • 2012-06-14
    • 2019-04-15
    • 2013-12-31
    • 1970-01-01
    • 2017-01-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多