【问题标题】:Rearrangement of packets using pthreads使用 pthread 重新排列数据包
【发布时间】:2013-11-08 20:44:31
【问题描述】:

所以这是我第一次在这里提出问题,尽管我已经使用这个网站很多年了!

我的问题有点棘手。我正在尝试开发一个用于发送大文件的客户端服务器应用程序,使用带有我自己的错误检查和流量控制的 UDP。现在,我开发了一个功能齐全的服务器和客户端。客户端请求特定文件,服务器开始发送。文件被部分读入缓冲区,以避免每次发送数据包时都必须读取文件的一小部分,从而节省处理时间。数据包由 1400 字节的实际数据 + 28 字节的标头(序列号、确认号、校验和等)组成。

所以我掌握了基础知识,一个简单的停止等待协议。在发送下一个数据包之前发送数据包并接收确认。

为了能够实现更智能的流控制算法,对于只有一些窗口的初学者,我必须在两个不同的线程中运行发送部分和接收确认部分。现在这是我遇到问题的地方。这是我第一次使用线程,所以请多多包涵。

我的问题是从客户端的数据包写入的文件已损坏。好吧,当用一个小的 jpg 文件测试时,文件只有 50% 的时间损坏,当用 MP4 文件测试时,它总是损坏!所以我想也许线程以某种方式重新排列了数据包的发送顺序?我用的是序列号,所以问题一定是在给数据包分配序列号之前出现的……

我确定我拆分文件的部分是正确的,并且我在客户端重新组装它的部分也是正确的,因为我在尝试实现线程之前已经对此进行了测试。还应该注意的是,我将代码的确切发送部分复制到了发送线程中,这在将其放入线程之前也能完美运行。这也是为什么我只发布我的线程部分的原因代码,因为这显然是造成问题的原因(并且因为项目的整个代码会占用大量空间)

我的发送线程代码:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t condition_var = PTHREAD_COND_INITIALIZER;

static void *send_thread(void *){
if (file.is_open()) {
    while(!file.reachedEnd()){
        pthread_mutex_lock(& mutex);

        if(seq <= upperwindow) {

            int blocksize = file.getNextBlocksize();
            senddata = new unsigned char[blocksize + 28];
            Packet to_send;
            to_send.data = new char[blocksize];
            to_send.sequenceNumber = seq;
            to_send.ackNumber = 0;
            to_send.type = 55; // DATA

            file.readBlock(*to_send.data);
            createPacket(senddata, to_send, blocksize + 28);
            if (server.sendToClient(reinterpret_cast<char*>(senddata), blocksize + 28) == -1)
                perror("sending failed");

            incrementSequenceNumber(seq);

            /* free memory */
            delete [] to_send.data;
            delete [] senddata;

        }
        pthread_mutex_unlock(& mutex);
    }
    pthread_exit(NULL);
} else {
    perror("file opening failed!");
    pthread_exit(NULL);
}
}

我的接收确认线程代码:

static void *wait_for_ack_thread(void *){
while(!file.reachedEnd()){
    Packet ack;

    if (server.receiveFromClient(reinterpret_cast<char*>(receivedata), 28) == -1) {
        perror("error receiving ack");
    } else {
        getPacket(receivedata, ack, 28);
        pthread_mutex_lock(& mutex);
        incrementSequenceNumber(upperwindow);

        pthread_mutex_unlock(& mutex)

    }
}
pthread_exit(NULL);
}

非常感谢所有cmets! :)

编辑: 增加readBlock函数代码:

void readBlock(char & in){

memcpy(& in, buffer + block_position, blocksize);
block_position = block_position + blocksize;
if(block_position == buffersize){
    buf_position ++;
    if(buf_position == buf_reads){
        buffersize = filesize % buffersize;
    }
    fillBuffer();
    block_position = 0;
}
if(blocksize < MAX_DATA_SIZE){
    reached_end = true;
    return;
}
if((buffersize - block_position) < MAX_DATA_SIZE){
    blocksize = buffersize % blocksize;

}

}

【问题讨论】:

  • 发送循环中互斥锁的可能点是什么,当 整个 主体被锁在它下面时,特别是当 nothing 完成时当 (seq &lt;= upper_window) 评估为假时更改退出条件?没有senddata 的声明,但我认为它是unsigned char*。这个:file.readBlock(*to_send.data) 看起来也很可疑。如果没有 actualreal 代码,则纯粹是猜测真正的问题是什么。
  • 你为什么不试试把std::cout打印每笔交易的序号,以便发送和接收,这样你就可以看到数据包是否按顺序发送。
  • 互斥体只包围了 if-body,因为 if-condition 中的变量 upperwindow,它也在另一个线程中被更新。但是我不确定是否有必要?是的,senddata 是一个无符号字符*。 readBlock 函数将数据块读取到 char 数组中,它应该可以正常工作,因为它之前已经测试过很多次(没有线程),这就是为什么我只发布线程代码..
  • @stellarossa 我已经试过了,而且序列号似乎按顺序通过..
  • 你检查收到的 ACK 是否有正确的序列号?因为如果您只是在收到正确的数据包时从客户端发送常量“ACK”,您最终可能会收到重复的 ACK 并为相同的数据增加两次(或更多)的序列号。我建议在客户端和服务器上都使用 wireshark 或 tcpdump 并比较结果。另外:“to_send.ackNumber”有什么作用?它似乎总是设置为零,所以。

标签: c++ multithreading sockets pthreads fstream


【解决方案1】:

创建一个表示通信状态的数组。

0 表示未发送,或发送和接收方报告错误。 1 表示发送。 2 表示已发送,确认已收到。

分配这个数组,并用互斥锁保护对它的访问。

发送线程在数组中保留两个指针——“已发送至”和“应该发送下一个”。这些归发送线程所有。

ack 线程只是简单地获取 ack 数据包,锁定数组,然后进行状态转换。

发送线程锁定数组,检查它是否可以推进“已发送到”指针(或者它是否应该重新发送旧的东西)。如果它发现一个错误,它会减少“应该下一个发送”指针指向它。

然后它会查看它是否应该接下来发送内容。如果应该,它将节点标记为“正在发送”,解锁数组并发送它。

如果发送线程没有工作,并且没有发现任何事情要做,它会在超时时进入睡眠状态,并且可能会被 ack 线程“踢醒”。

现在,请注意,客户端可能会以错误的顺序获取由此发送的数据包,除非您将其限制为只有 1 个数据包在传输中。

连接状态数组不必是字面量数组,但如果你先从它开始,然后再进行优化,会更容易。

在接收端,您必须注意序列号,因为数据包可能会乱序到达。为了测试这一点,编写一个服务器,故意以错误的顺序发送数据包,并确保客户端设法将它们正确地拼接在一起。

【讨论】:

  • 感谢您的回答!我一定会尝试实现这个连接状态数组,非常感谢。无论如何,我打算实现某种数据结构来跟踪已发送和确认的内容等,只是想让基本的发送/接收线程首先运行,因为我是线程新手。
  • @user1419999 了解:问题是您的代码 (A) 长时间持有锁,并且 (B) 没有在每个线程需要的线程之间传输数据。如果您不知道正在确认什么,那么计数确认的数量相对没用!
猜你喜欢
  • 1970-01-01
  • 2019-04-13
  • 2010-09-20
  • 2019-11-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-11-12
  • 2017-10-17
相关资源
最近更新 更多