【问题标题】:Asynchronously writing to a file in c++ unix在 c++ unix 中异步写入文件
【发布时间】:2014-01-15 00:31:53
【问题描述】:

我有一些很长的循环,我需要在每次迭代时将一些数据写入文件。问题是写入文件可能很慢,所以我想通过异步写入来减少所需的时间。

有没有人知道这样做的好方法?我是否应该通过写出来创建一个线程来消耗放入其缓冲区的任何内容(在这种情况下,单个生产者,单个消费者)?

我主要对除了标准库 (C++11) 之外不涉及任何内容的解决方案感兴趣。

【问题讨论】:

  • 它不是标准库的一部分,但如果您最终不喜欢标准库解决方案,您应该查看libuv
  • @TaylorFlores:谢谢!我会调查一下,但乍一看,它看起来比我需要的要多得多。
  • 你现在用什么函数来读写?如果您还没有使用缓冲 I/O 的 stdio 库,请尝试一下。如果是,您可以尝试调用 setvbuf 来增加缓冲区大小。

标签: c++ c++11 asynchronous file-io


【解决方案1】:

在进行异步写入之前,如果您使用 IOStreams,您可能希望避免意外刷新流,例如,使用std::endl,而是使用'\n'。由于写入 IOStreams 是缓冲的,这可以大大提高性能。

如果这还不够,下一个问题是如何写入数据。如果正在进行大量格式化,则实际格式化可能会花费大部分时间。您可能能够将格式化推送到一个单独的线程中,但这与仅仅将几个字节的写入传递给另一个线程完全不同:您需要传递一个合适的数据结构来保存要格式化的数据。不过,适合的内容取决于您实际编写的内容。

最后,如果将缓冲区写入文件确实是瓶颈,并且您想坚持使用标准 C++ 库,那么有一个编写线程来侦听一个来自合适的流缓冲区的缓冲区填充的队列,并且将缓冲区写入std::ofstream:生产者接口将是std::ostream,当缓冲区已满或流被刷新时(我将明确使用std::flush),它将发送可能固定大小的缓冲区到另一个读取侦听的队列。以下是仅使用标准库工具快速实现该想法的方法:

#include <condition_variable>
#include <fstream>
#include <mutex>
#include <queue>
#include <streambuf>
#include <string>
#include <thread>
#include <vector>

struct async_buf
    : std::streambuf
{
    std::ofstream                 out;
    std::mutex                    mutex;
    std::condition_variable       condition;
    std::queue<std::vector<char>> queue;
    std::vector<char>             buffer;
    bool                          done;
    std::thread                   thread;

    void worker() {
        bool local_done(false);
        std::vector<char> buf;
        while (!local_done) {
            {
                std::unique_lock<std::mutex> guard(this->mutex);
                this->condition.wait(guard,
                                     [this](){ return !this->queue.empty()
                                                   || this->done; });
                if (!this->queue.empty()) {
                    buf.swap(queue.front());
                    queue.pop();
                }
                local_done = this->queue.empty() && this->done;
            }
            if (!buf.empty()) {
                out.write(buf.data(), std::streamsize(buf.size()));
                buf.clear();
            }
        }
        out.flush();
    }

public:
    async_buf(std::string const& name)
        : out(name)
        , buffer(128)
        , done(false)
        , thread(&async_buf::worker, this) {
        this->setp(this->buffer.data(),
                   this->buffer.data() + this->buffer.size() - 1);
    }
    ~async_buf() {
        std::unique_lock<std::mutex>(this->mutex), (this->done = true);
        this->condition.notify_one();
        this->thread.join();
    }
    int overflow(int c) {
        if (c != std::char_traits<char>::eof()) {
            *this->pptr() = std::char_traits<char>::to_char_type(c);
            this->pbump(1);
        }
        return this->sync() != -1
            ? std::char_traits<char>::not_eof(c): std::char_traits<char>::eof();
    }
    int sync() {
        if (this->pbase() != this->pptr()) {
            this->buffer.resize(std::size_t(this->pptr() - this->pbase()));
            {
                std::unique_lock<std::mutex> guard(this->mutex);
                this->queue.push(std::move(this->buffer));
            }
            this->condition.notify_one();
            this->buffer = std::vector<char>(128);
            this->setp(this->buffer.data(),
                       this->buffer.data() + this->buffer.size() - 1);
        }
        return 0;
    }

};

int main()
{
    async_buf    sbuf("async.out");
    std::ostream astream(&sbuf);
    std::ifstream in("async_stream.cpp");
    for (std::string line; std::getline(in, line); ) {
        astream << line << '\n' << std::flush;
    }
}

【讨论】:

  • AndrewSpott:使用文件流的默认设置进行缓冲。您可以通过调用 stream.rdbuf()-&gt;setbuf(0, 0) 来禁用文件流缓冲。
  • @zangw:当缓冲区已满时,它应该自动刷新:overflow() 在写入缓冲区中没有更多空间的字符时调用[基于流所知道的:还有一个字符空间可以将参数粘贴到overflow() in]。如果要在缓冲区未满的情况下发送数据,则需要flush(当流被销毁时,它将flush)。上面的实现将数据分成 128 个字节的单元。当然,可以更改常量(我还没有分析代码以查看哪种大小最有意义)。
  • @zangw:好吧,当然。上面的类的设计写了一个缓冲区,并在它满时将其交给另一个线程,使用一个新的缓冲区来写入。可能有一个可用缓冲区队列(如果没有可用缓冲区,最好使用这些缓冲区并创建一个新缓冲区),但这没有实现。您还可以在 overflow() 中增加缓冲区,并且仅在显式刷新时发送它(即,当调用 sync() 时)。
  • @zwang:接下来,您似乎将此站点与某些免费劳动力来源混淆了,而事实并非如此。如果您希望有人审查您的代码,您需要将其放在例如 codereview 上,可能会从此处的评论中指向它,以在您的代码审查中引起对这个问题感兴趣的人的注意。如果你有具体的问题,你可以在这里问。
  • @zangw:关于上面提出的具体问题:您应该创建一个问题,而不是在评论中提问。 .. 简短的回答是:是否可以使用一个缓冲区与流缓冲区无关,而是与如何在两个线程之间同步对缓冲区的访问。如果您确保线程不会触及一个缓冲区中的相同字节,那么一切都会好起来的。如果您最终在另一个线程中以非同步方式访问的线程中写入一个字节,则您的行为未定义。
【解决方案2】:

在网上搜索“双缓冲”。

一般来说,一个线程将写入一个或多个缓冲区。另一个线程从缓冲区中读取,“追逐”写入线程。

这可能不会使您的程序更有效率。文件的效率是通过写入大块来实现的,这样驱动器就没有机会减速。许多字节的一次写入比几个字节的多次写入更有效。

这可以通过让写入线程仅在缓冲区内容超过某个阈值(如 1k)时写入来实现。

还要研究“假脱机”或“打印假脱机”的主题。

您需要使用 C++11,因为以前的版本在标准库中没有线程支持。我不知道你为什么要限制自己,因为 Boost 里面有一些好东西。

【讨论】:

    猜你喜欢
    • 2011-02-06
    • 2017-06-07
    • 1970-01-01
    • 2012-08-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多