【问题标题】:Reusing thread in loop c++在循环c ++中重用线程
【发布时间】:2014-12-18 10:44:51
【问题描述】:

我需要在 C++ 程序中并行化一些任务,并且对并行编程完全陌生。到目前为止,我通过互联网搜索取得了一些进展,但现在有点卡住了。我想在循环中重用一些线程,但显然不知道该怎么做。

我正在从计算机上的两个ADC卡采集数据(并行采集),那么我需要在采集下一批数据的同时对采集到的数据(并行处理)进行一些操作。这里有一些伪代码来说明

//Acquire some data, wait for all the data to be acquired before proceeding
std::thread acq1(AcquireData, boardHandle1, memoryAddress1a);
std::thread acq2(AcquireData, boardHandle2, memoryAddress2a);
acq1.join();
acq2.join();

while(user doesn't interrupt)
{

//Process first batch of data while acquiring new data
std::thread proc1(ProcessData,memoryAddress1a);
std::thread proc2(ProcessData,memoryAddress2a);
acq1(AcquireData, boardHandle1, memoryAddress1b);
acq2(AcquireData, boardHandle2, memoryAddress2b);
acq1.join();
acq2.join();
proc1.join();
proc2.join();
/*Proceed in this manner, alternating which memory address 
is written to and being processed until the user interrupts the program.*/
}

这就是它的主要内容。循环的下一次运行将在处理“b”数据时写入“a”内存地址并继续交替(我可以让代码执行此操作,只是将其取出以防止问题混乱)。

无论如何,问题(我相信有些人已经知道了)是我第二次尝试使用 acq1 和 acq2 时,编译器 (VS2012) 说“IntelliSense:调用没有适当的 operator() 或将函数转换为指向函数类型的指针”。同样,如果我再次将 std::thread 放在 acq1 和 acq2 前面,它会显示“错误 C2374:'acq1':重新定义;多重初始化”。

所以问题是,我可以在线程完成之前的任务后将它们重新分配给新任务吗?我总是在再次调用之前等待线程的先前使用结束,但我不知道如何重新分配线程,并且由于它处于循环中,我无法每次都创建一个新线程(或者如果我可以,这似乎是浪费和不必要的,但我可能弄错了)。

提前致谢

【问题讨论】:

    标签: c++ multithreading loops reusability


    【解决方案1】:

    您也可以创建自己的 Thread 类并调用它的 run 方法,如下所示:

    class MyThread
    {
    public:
    void run(std::function<void()> func) {
       thread_ = std::thread(func);
    }
    void join() {
       if(thread_.joinable())
          thread_.join();
    }
    private:
       std::thread thread_;
    };
    
    // Application code...
    MyThread myThread;
    myThread.run(AcquireData);
    

    【讨论】:

      【解决方案2】:

      我认为您需要一个更简单的答案来多次运行一组线程,这是最好的解决方案:

      do{
      
          std::vector<std::thread> thread_vector;
      
           for (int i=0;i<nworkers;i++)
           {
             thread_vector.push_back(std::thread(yourFunction,Parameter1,Parameter2, ...));
          }
      
          for(std::thread& it: thread_vector)
          { 
            it.join();
          }
         q++;
      } while(q<NTIMES);
      

      【讨论】:

        【解决方案3】:

        最简单的方法是使用std::function 对象的可等待队列。像这样:

        #include <iostream>
        #include <thread>
        #include <mutex>
        #include <condition_variable>
        #include <queue>
        #include <functional>
        #include <chrono>
        
        
        class ThreadPool
        {
            public:
        
            ThreadPool (int threads) : shutdown_ (false)
            {
                // Create the specified number of threads
                threads_.reserve (threads);
                for (int i = 0; i < threads; ++i)
                    threads_.emplace_back (std::bind (&ThreadPool::threadEntry, this, i));
            }
        
            ~ThreadPool ()
            {
                {
                    // Unblock any threads and tell them to stop
                    std::unique_lock <std::mutex> l (lock_);
        
                    shutdown_ = true;
                    condVar_.notify_all();
                }
        
                // Wait for all threads to stop
                std::cerr << "Joining threads" << std::endl;
                for (auto& thread : threads_)
                    thread.join();
            }
        
            void doJob (std::function <void (void)> func)
            {
                // Place a job on the queu and unblock a thread
                std::unique_lock <std::mutex> l (lock_);
        
                jobs_.emplace (std::move (func));
                condVar_.notify_one();
            }
        
            protected:
        
            void threadEntry (int i)
            {
                std::function <void (void)> job;
        
                while (1)
                {
                    {
                        std::unique_lock <std::mutex> l (lock_);
        
                        while (! shutdown_ && jobs_.empty())
                            condVar_.wait (l);
        
                        if (jobs_.empty ())
                        {
                            // No jobs to do and we are shutting down
                            std::cerr << "Thread " << i << " terminates" << std::endl;
                            return;
                         }
        
                        std::cerr << "Thread " << i << " does a job" << std::endl;
                        job = std::move (jobs_.front ());
                        jobs_.pop();
                    }
        
                    // Do the job without holding any locks
                    job ();
                }
        
            }
        
            std::mutex lock_;
            std::condition_variable condVar_;
            bool shutdown_;
            std::queue <std::function <void (void)>> jobs_;
            std::vector <std::thread> threads_;
        };
        
        void silly (int n)
        {
            // A silly job for demonstration purposes
            std::cerr << "Sleeping for " << n << " seconds" << std::endl;
            std::this_thread::sleep_for (std::chrono::seconds (n));
        }
        
        int main()
        {
            // Create two threads
            ThreadPool p (2);
        
            // Assign them 4 jobs
            p.doJob (std::bind (silly, 1));
            p.doJob (std::bind (silly, 2));
            p.doJob (std::bind (silly, 3));
            p.doJob (std::bind (silly, 4));
        }
        

        【讨论】:

        • 不错的 KISS 解决方案,没有额外的装饰。
        • 对于无限循环函数(傻)这个线程池不是并发的。
        • @AmirForsati 我不明白你的意思。
        • 想象一下我们管理用户套接字 i/o 的函数。我们将在这些函数内部使用无限循环while(true) 来读取和写入用户流。只有第一个函数会被执行,其他函数会在队列中等待!线程池的概念是让线程通过在任务之间切换来并发执行任务。
        • @AndrewPilikin 按照编译器的说明编译使用 pthread 的代码。也许它需要-pthread
        【解决方案4】:

        std::thread 类被设计为只执行一项任务(您在构造函数中给它的任务)然后结束。如果你想做更多的工作,你需要一个新线程。从 C++11 开始,这就是我们所拥有的。线程池没有成为标准。 (我不确定 C++14 对它们有什么看法。)

        幸运的是,您可以轻松地自己实现所需的逻辑。这是大图:

        • 启动 n 个工作线程,它们都执行以下操作:
          • 有更多工作要做时重复:
            • 抓住下一个任务t(可能要等到一个准备好)。
            • 处理t
        • 继续在处理队列中插入新任务。
        • 告诉工作线程没有什么可做的了。
        • 等待工作线程完成。

        这里最困难的部分(仍然相当容易)是正确设计工作队列。通常,一个同步链表(来自 STL)可以做到这一点。同步意味着任何希望操作队列的线程只能在它获得std::mutex 之后才这样做,以避免竞争条件。如果工作线程发现列表为空,它必须等待,直到再次有一些工作。您可以为此使用std::condition_variable。每次将新任务插入队列时,插入线程通知一个等待条件变量的线程,因此将停止阻塞并最终开始处理新任务。

        第二个不太重要的部分是如何向工作线程发出信号,表明没有更多工作要做。显然,您可以设置一些全局标志,但如果一个工作人员被阻塞在队列中等待,它不会很快意识到。一种解决方案可能是notify_all() 线程,并让他们在每次收到通知时检查标志。另一种选择是在队列中插入一些不同的“有毒”项目。如果工人遇到这个项目,它会自行退出。

        使用您自定义的 task 对象或简单的 lambda 可以直接表示任务队列。

        以上都是 C++11 的特性。如果您坚持使用早期版本,则需要求助于为您的特定平台提供多线程的第三方库。

        虽然这些都不是火箭科学,但第一次仍然很容易出错。不幸的是,与并发相关的错误是最难调试的错误之一。从花几个小时阅读一本好书的相关部分或完成教程开始,很快就会得到回报。

        【讨论】:

        • 感谢您的回答,非常详尽且写得很好。如果下面 luk32 的回答对我不起作用,我将把它保存为 B 计划。你的可能是做事的“正确”方式。
        【解决方案5】:

        嗯,这取决于您是否考虑进行重新分配。您可以移动线程但不能复制它。

        以下代码将在每次迭代中创建一对新线程并将它们移动到旧线程的位置。我想这应该可行,因为新的 thread 对象将是临时对象。

        while(user doesn't interrupt)
        {
        //Process first batch of data while acquiring new data
        std::thread proc1(ProcessData,memoryAddress1a);
        std::thread proc2(ProcessData,memoryAddress2a);
        acq1 = std::thread(AcquireData, boardHandle1, memoryAddress1b);
        acq2 = std::thread(AcquireData, boardHandle2, memoryAddress2b);
        acq1.join();
        acq2.join();
        proc1.join();
        proc2.join();
        /*Proceed in this manner, alternating which memory address 
        is written to and being processed until the user interrupts the program.*/
        }
        

        发生的事情是,对象实际上并没有在迭代结束时结束它的生命周期,因为它是在关于循环的外部范围内声明的。但是每次都会创建一个新对象并发生move。我看不出有什么可以幸免的(我可能很愚蠢),所以我想这与在循环中声明 acqs 并简单地重用符号完全相同。总而言之...是的,它是关于如何对创建临时和move 进行分类。

        此外,这显然会在每个循环中启动一个新线程(当然会结束先前分配的线程),它不会让线程等待新数据并神奇地将其馈送到处理管道。您需要以不同的方式实现它。例如:工作线程池和队列通信。

        参考:operator=(ctor)

        我认为你得到的错误是不言自明的,所以我将跳过解释它们。

        【讨论】:

        • 谢谢,我试过了,它似乎可以根据需要工作。时间会证明它是否有任何问题,但现在它的帮助很大!
        【解决方案6】:

        这个

         std::thread acq1(...)
        

        是构造函数的调用。构造一个名为 acq1 的新对象

        这个

          acq1(...)
        

        是 () 运算符对现有对象 aqc1 的应用。如果没有为 std::thread 定义这样的运算符,编译器会抱怨。

        据我所知,您可能不会重用 std::threads。您构建并启动它们。加入他们并扔掉他们,

        【讨论】:

        • 感谢 Oncaphillis,在您的回复和 luk32 之间,我想我意识到我的语法错误是什么。
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-06-17
        • 1970-01-01
        • 2014-11-12
        • 1970-01-01
        • 2021-06-01
        相关资源
        最近更新 更多