【问题标题】:Writing in file from shared buffer missing data and program crash without cout从共享缓冲区写入文件丢失数据并且程序在没有 cout 的情况下崩溃
【发布时间】:2019-06-21 10:01:26
【问题描述】:

我正在使用线程和共享缓冲区制作程序。两个线程在后台无限期地运行,一个线程将用数据填充共享缓冲区,另一个线程将共享缓冲区的内容写入文件。

用户可以启动或停止数据填充,导致线程进入等待状态,直到用户再次启动线程。每个循环缓冲区填充 50 个浮点数。

这是代码:


#include <iostream>
#include <vector>
#include <iterator>
#include <utility>
#include <fstream>
#include <condition_variable>
#include <mutex>
#include <thread>

using namespace std;

std::mutex m;
std::condition_variable cv;
std::vector<std::vector<float>> datas;
bool keep_running = true, start_running = false;

void writing_thread()
{
    ofstream myfile;

    bool opn = false;

    while(1)
    {

        while(keep_running)
        {
            // Open the file only once
            if(!opn)
            {
                myfile.open("IQ_Datas.txt");
                opn = true;

            }


            // Wait until main() sends data
            std::unique_lock<std::mutex> lk(m);

            cv.wait(lk, [] {return !datas.empty();});


            auto d = std::move(datas);


            lk.unlock();


            for(auto &entry : d)
            {
                for(auto &e : entry)
                    myfile << e << endl;
            }


        }

        if(opn)
        {
            myfile.close();
            opn = false;
        }

    }
}

void sending_thread()
{

    std::vector<float> m_buffer;
    int cpt=0;
    //Fill the buffer with 50 floats
    for(float i=0; i<50; i++)
        m_buffer.push_back(i);

    while(1)
    {
        {
            std::unique_lock<std::mutex> lk(m);
            cv.wait(lk, [] {return keep_running && start_running;});

        }
        while(keep_running)
        {

            //Each loop d is containing 50 floats
            std::vector<float> d = m_buffer;

            cout << "in3" << endl; //Commenting this line makes the program crash

            {
                std::lock_guard<std::mutex> lk(m);
                if (!keep_running)break;
                datas.push_back(std::move(d));
            }
            cv.notify_one();
            cpt++;
        }

        cout << "Total data: " << cpt*50 << endl;
        cpt = 0;
    }
}
void start()
{
    {
        std::unique_lock<std::mutex> lk(m);
        start_running = true;
    }
    cv.notify_all();
}
void stop()
{
    {
        std::unique_lock<std::mutex> lk(m);
        start_running = false;
    }
    cv.notify_all();
}

int main()
{
    int go = 0;
    thread t1(sending_thread);
    thread t2(writing_thread);

    t1.detach();
    t2.detach();

    while(1)
    {

        std::cin >> go;

        if(go == 1)
        {
            start();
            keep_running = true;
        }
        else if(go == 0)
        {
            stop();
            keep_running = false;
        }


    }

    return 0;
}


我对这段代码有 2 个问题:

  • 在注释cout &lt;&lt; "in3" &lt;&lt; endl; 行时,程序将在大约 20-40 秒后崩溃并显示错误消息:terminate call after throwing an instance of 'std::bad_alloc' 什么():std::bad_alloc。如果我让 cout,程序将毫无问题地运行。

  • 程序运行时,停止sending_thread后,我会显示用cout &lt;&lt; "Total data: " &lt;&lt; cpt*50 &lt;&lt; endl;复制的数据总量。对于少量数据,所有数据都正确写入文件,但当数据量大时,会丢失数据。 Missing/Correct data(文件总行数与total data不匹配)

为什么使用 cout 程序运行正常?是什么导致数据丢失?是因为sending_thread 填充缓冲区太快,而writing_thread 写入文件需要太多时间吗?

编辑: 一些精确度,在sending_thread 中添加更多 cout 似乎可以解决所有问题。第一个线程产生了 2100 万个浮点数,第二个线程成功地在文件中写入了 2100 万个浮点数。似乎没有 cout,生产者线程的工作速度太快,消费者线程无法在将数据写入文件时不断从共享缓冲区中检索数据。

【问题讨论】:

  • 您的主线程似乎更改了 keep_running 共享布尔变量而没有任何互斥。因此,更改可以保持“被困”在运行主线程的 cpu 内核的缓存中。然而,写入“in3”的事实是一个系统调用,它可以强制刷新缓存,因此可能解释了行为的变化。您也应该在主线程中使用适当的互斥。
  • 好的,我可以改变函数startstop中keep_running的值来避免这种行为吗?此外,keep_running 可以一直被困在缓存中的事实导致 bad_alloc 错误?
  • 是的,将共享变量的更改移动到受锁保护的区域应该会有所帮助。此外,在您的初始代码中,生产者线程可能会看到更改并以不间断的方式生产,而消费者线程无法看到更改并保持不变。因此,大量数据被缓冲,直到资源耗尽。但是,一般来说,推测为什么不同步的程序会发生这样那样的行为并不是很好地利用人们的时间。
  • 你需要重新考虑这个auto d = std::move(datas);,这将使你的datas处于未指定状态,也许在move之后重建datas给它一个新的初始状态将解决你丢失的数据。跨度>
  • @muaz 是的,我按照 Ted Lyngmo 的建议对其进行了修改。 auto d = std::move(datas); 现在是 datas.swap(d);。只有当我放 3 cout 时才没有丢失数据(见我的编辑),真的很奇怪。

标签: c++ multithreading file crash


【解决方案1】:

避免:

Moved-from object 'datas' of type 'std::vector' is moved:
        auto d = std::move(datas);
                 ^~~~~~~~~~~~~~~~

替换这个:

        // Wait until main() sends data
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, [] {return !datas.empty();});
        auto d = std::move(datas);
        lk.unlock();

有了这个:

        // Wait until main() sends data            
        std::vector<std::vector<float>> d;
        {
            std::unique_lock<std::mutex> lk(m);
            cv.wait(lk, [] { return !datas.empty(); });
            datas.swap(d);
        }

还将从多个线程访问的bool 变量替换为std::atomic_boolstd::atomic_flag

bad_alloc 来自sending_threadwriting_thread 快得多,所以它会耗尽内存。当您放慢sending_thread 的速度足够(打印)时,问题就不那么明显了,但您应该有一些同步才能正确完成。您可以围绕它创建一个包装器类并提供插入和提取方法以确保所有访问都正确同步并为其提供最大数量的元素。一个例子:

template<typename T>
class atomic2dvector {
public:
    atomic2dvector(size_t max_elements) : m_max_elements(max_elements) {}

    atomic2dvector(const atomic2dvector&) = delete;
    atomic2dvector(atomic2dvector&&) = delete;
    atomic2dvector& operator=(const atomic2dvector&) = delete;
    atomic2dvector& operator=(atomic2dvector&&) = delete;

    ~atomic2dvector() { shutdown(); }

    bool insert_one(std::vector<T>&& other) {
        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_current_elements + m_data.size() > m_max_elements && m_shutdown == false)
            m_cv.wait(lock);
        if(m_shutdown) return false;

        m_current_elements += other.size();
        m_data.emplace_back(std::forward<std::vector<T>>(other));

        m_cv.notify_one();
        return true;
    }
    std::vector<std::vector<T>> extract_all() {
        std::vector<std::vector<T>> return_value;

        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_data.empty() && m_shutdown == false) m_cv.wait(lock);

        if(m_shutdown == false) {
            m_current_elements = 0;
            return_value.swap(m_data);
        } else {
            // return an empty vector if we should shutdown
        }
        m_cv.notify_one();

        return return_value;
    }

    bool is_active() const { return m_shutdown == false; }

    void shutdown() {
        m_shutdown = true;
        m_cv.notify_all();
    }

private:
    size_t m_max_elements;
    size_t m_current_elements = 0;
    std::atomic<bool> m_shutdown = false;
    std::condition_variable m_cv{};
    std::mutex m_mtx{};
    std::vector<std::vector<T>> m_data{};
};

如果您想在关机后继续提取数据,您可以将extract_all() 更改为:

   std::vector<std::vector<T>> extract_all() {
        std::vector<std::vector<T>> return_value;

        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_data.empty() && m_shutdown == false) m_cv.wait(lock);

        m_current_elements = 0;
        return_value.swap(m_data);
        m_cv.notify_one();

        return return_value;
    }

一个完整的例子可能如下所示:

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <fstream>
#include <iostream>
#include <iterator>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>

using namespace std;

template<typename T>
class atomic2dvector {
public:
    atomic2dvector(size_t max_elements) : m_max_elements(max_elements) {}
    atomic2dvector(const atomic2dvector&) = delete;
    atomic2dvector(atomic2dvector&&) = delete;
    atomic2dvector& operator=(const atomic2dvector&) = delete;
    atomic2dvector& operator=(atomic2dvector&&) = delete;

    ~atomic2dvector() { shutdown(); }

    bool insert_one(std::vector<T>&& other) {
        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_current_elements + m_data.size() > m_max_elements &&
              m_shutdown == false)
            m_cv.wait(lock);
        if(m_shutdown) return false;

        m_current_elements += other.size();
        m_data.emplace_back(std::forward<std::vector<T>>(other));

        m_cv.notify_one();
        return true;
    }
    std::vector<std::vector<T>> extract_all() {
        std::vector<std::vector<T>> return_value;

        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_data.empty() && m_shutdown == false) m_cv.wait(lock);

        m_current_elements = 0;
        return_value.swap(m_data);
        m_cv.notify_one();

        return return_value;
    }

    bool is_active() const { return m_shutdown == false; }

    void shutdown() {
        m_shutdown = true;
        m_cv.notify_all();
    }

private:
    size_t m_max_elements;
    size_t m_current_elements = 0;
    std::atomic<bool> m_shutdown = false;
    std::condition_variable m_cv{};
    std::mutex m_mtx{};
    std::vector<std::vector<T>> m_data{};
};

std::mutex m;
std::condition_variable cv;
atomic2dvector<float> datas(256 * 1024 * 1024 / sizeof(float)); // 0.25 GiB limit
std::atomic_bool start_running = false;

void writing_thread() {
    std::ofstream myfile("IQ_Datas.txt");
    if(myfile) {
        std::cout << "writing_thread waiting\n";

        std::vector<std::vector<float>> d;
        while((d = datas.extract_all()).empty() == false) {
            std::cout << "got " << d.size() << "\n";

            for(auto& entry : d) {
                for(auto& e : entry) myfile << e << "\n";
            }
            std::cout << "wrote " << d.size() << "\n\n";
        }
    }
    std::cout << "writing_thread shutting down\n";
}

void sending_thread() {
    std::vector<float> m_buffer;
    std::uintmax_t cpt = 0;
    // Fill the buffer with 50 floats
    for(float i = 0; i < 50; i++) m_buffer.push_back(i);

    while(true) {
        {
            std::unique_lock<std::mutex> lk(m);
            cv.wait(lk, [] {
                return start_running == true || datas.is_active() == false;
            });
        }
        if(datas.is_active() == false) break;
        std::cout << "sending...\n";
        while(start_running == true) {
            // Each loop d is containing 50 floats
            std::vector<float> d = m_buffer;
            if(datas.insert_one(std::move(d)) == false) break;
            cpt++;
        }
        cout << "Total data: " << cpt * 50 << endl;
        cpt = 0;
    }
    std::cout << "sending_thread shutting down\n";
}

void start() {
    std::unique_lock<std::mutex> lk(m);
    start_running = true;
    cv.notify_all();
}
void stop() {
    std::unique_lock<std::mutex> lk(m);
    start_running = false;
    cv.notify_all();
}
void quit() {
    datas.shutdown();
    cv.notify_all();
}

int main() {
    int go = 0;
    thread t1(sending_thread);
    thread t2(writing_thread);

    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    std::cout << "Enter 1 to make the sending thread send and 0 to make it stop "
                 "sending. Enter a non-integer to shutdown.\n";

    while(std::cin >> go) {
        if(go == 1) {
            start();
        } else if(go == 0) {
            stop();
        }
    }
    std::cout << "--- shutting down ---\n";
    quit();

    std::cout << "joining threads\n";
    t1.join();
    std::cout << "t1 joined\n";
    t2.join();
    std::cout << "t2 joined\n";
}

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多