【问题标题】:High speed buffering in C++C++中的高速缓冲
【发布时间】:2017-05-02 02:31:21
【问题描述】:

我一直在编写一个工具,它可以从 SDR 设备高速接收缓冲区(每秒 1000 万个复杂样本(样本是短类型))。但是对于我编写的代码,每当我回顾所写的内容时,都会丢失小块。

我试图缓解这个问题的方法是使用两个大小相同的缓冲区并在它们之间交换以避免丢失任何样本。每当我完成交换缓冲区并将样本卸载到后台缓冲区(其大小是采样率的 3 倍)以及如果需要调用新线程将新数据写入磁盘的过程时,这些块就会丢失。

SDR 设备本身将自己的内部缓冲区大小宣传为像 2016 一样奇怪的东西,它提供了两个指针,指向样本的实数和虚数数组。显然,我想在这个采样率下避免这种小数组的开销,所以通过实现更大尺寸的交换缓冲区,比如 65536,希望我可以避免这些问题,但无济于事。

我已经指出问题最有可能出现在回调函数上,因为当我减小交换缓冲区的大小时,丢失的块变得更加频繁。

我是不是走错了路,还是我的解决方案中缺少更明显的东西,或者我写的东西不正确?

我尽可能避免使用标准库,因为它对于这种数据速度来说太慢了,因此需要 memmove 和 memcpy。唯一的例外是缓冲区指针交换和创建线程。

交换缓冲区的实现方式为:

    IQType<short>* bufferA;
    IQType<short>* bufferB;

IQType 是:

template <class T> class IQType {
public:
        T inPhaseValue;
        T quadraturePhaseValue;

        IQType() : inPhaseValue(0), quadraturePhaseValue(0){};
        IQType(T i, T q) : inPhaseValue(i), quadraturePhaseValue(q){};
};

卸载SDR样本数据的SDR设备回调函数:

void MiricsDataSource::newSamplesCallBack(short *xi, short *xq, unsigned int firstSampleNum, int grChanged, int rfChanged, int fsChanged, unsigned int numSamples, unsigned int reset, void *cbContext) {

    MiricsDataSource* mirCtx = static_cast<MiricsDataSource*>(cbContext);

    for (int i = 0; i < numSamples; ++i)
    {
        mirCtx->bufferA[mirCtx->bufferCount] = IQType<short>(xi[i],xq[i]);
        mirCtx->bufferCount++;
        if(mirCtx->bufferCount == mirCtx->bufferSize-1) {
            std::swap(mirCtx->bufferA,mirCtx->bufferB);
            mirCtx->owner->backBuffer->write(mirCtx->bufferB,mirCtx->bufferSize);
            mirCtx->bufferCount = 0;
        }
    }
}

BackBuffer 写入和相关的 t_write 函数:

void BackBuffer::write(const IQType<short>* buff, size_t bLength) {
    std::thread dumpThread(&BackBuffer::t_write,this,buff,bLength);
    dumpThread.detach();
}

void BackBuffer::t_write(const IQType<short>* buff, size_t bLength) {
    std::lock_guard<std::mutex> lck (bufferMutex);
    memmove(&backBuffer[0],(&backBuffer[0])+bLength,(sizeof(IQType<short>*)*(length-bLength)));
    memcpy(&backBuffer[length-bLength],buff,(sizeof(IQType<short>*)*(bLength)));
    if(dumpToFile) {
        IQType<short>* toWrite = new IQType<short>[bLength];
        memcpy(toWrite,buff,(sizeof(IQType<short>*)*(bLength)));
        strmDmpMgr->write(toWrite,bLength);
    }
}

【问题讨论】:

  • I have avoided the standard library as much as possible simply because it is too slow for this kind of data speed hence the need for memmove and memcpy 当我清楚地看到标准库只是执行 memcpy/memmove 时,我只是不买它,而类型就像你的情况一样微不足道。如果您确实没有测量它,请删除该声明。
  • “我尽可能避免使用标准库 [...] 因此需要 [标准库中的函数]”似乎有点矛盾。
  • 最初我使用 std::rotate 来移动后台缓冲区,但这需要 5 秒来存储 3000 万个样本。 memmove 这样做的速度要快数百倍。 Justy 澄清我对后台缓冲区中的数据做其他事情,而不是记录。后备缓冲区还用于观察记录数据的最后 3 秒。它的处理方式与队列类似,但可以选择查看任何位置和任何长度,直到后备缓冲区的大小
  • @Gelion rotate 根本不是memove 的标准替代品!它所做的工作比您需要的要多得多。
  • 那么为什么你需要两个缓冲区并交换恶作剧而不是仅仅实现一个足够大的环形缓冲区呢?也不应该需要任何锁定和多个动态线程,我们这里只讨论 20MB/s,只需要一个读取器和写入器线程就可以了。

标签: c++ multithreading stream buffer


【解决方案1】:
  1. 最大的代价是在BackBuffer::write 中产生一个线程。不要这样做,只需运行一个持久后台线程并向其发送消息。

  2. 在您当前的设置中存在损坏输出缓冲区的风险(在线程完成第一个缓冲区之前填充第二个缓冲区允许您再次开始覆盖第一个缓冲区)。您可以处理任意数量的缓冲区,只需使用一个完整缓冲区队列和一个空缓冲区队列在线程之间循环它们。

    如果您低于某个最低级别的空闲缓冲区,则让后台线程负责创建新缓冲区,以使动态分配远离关键循环。

  3. 正如 Voo 所说,直接读取大缓冲区(并避免中间的 memcpy 等)仍然更简单。与缓冲区列表方法相比,它的弹性确实要小一些,但在这里你并不需要这种灵活性。

还有一些较小的优化(例如,不要在每次迭代时通过间接方式增加缓冲区计数,只需存储一次正确的值),但该线程是主要问题。

【讨论】:

    【解决方案2】:

    一个可能的来源是您为每个“数据转储”创建一个新线程。根据缓冲区大小,您可以每秒创建数千个线程,这可能导致严重的性能下降,不仅是您的程序,而且是整个计算机。创建单个线程是一项昂贵的操作,更不用说操作必须在所有线程之间循环加上系统上的所有其他线程。

    相反,我建议采用不同的设计,在其中您有一个已经运行的线程池(搜索c++ thread pools),您要求转储缓冲区。然后你可以有一个环形的缓冲区,一个用于每个线程加上一个你当前正在写入的地方。

    【讨论】:

    • 我刚刚检查了它使用了多少线程,它保持在 15 左右(根据 windows)。有没有更准确的方法来查看产生了多少?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-09-26
    • 1970-01-01
    相关资源
    最近更新 更多