【问题标题】:Can a thread be reused to run variadic functions?可以重用线程来运行可变参数函数吗?
【发布时间】:2013-03-06 01:37:02
【问题描述】:

我正在尝试在 C++ 中创建一个 boost 线程,该线程可以重复用于运行具有不同数量和类型 args 的各种函数。

这可以用 C++11x 可变参数来完成吗?

在我的用例中,我不需要队列(如果线程很忙,那么方法就会失败),但如果需要实现这个“统一”功能,我会很不情愿地这样做。

我不明白如何使用 bind 或 lambda 处理统一,以便一个线程可以调用不同的函数,每个函数都有自己的不同数量和类型的 args。

我的想法大致如下:

class WorkThread
{
public:
   WorkThread()
       {
       // create thread and bind runner to thread
       }

   ~WorkThread()
       {
       // tell runner to exit and wait and reap the thread
       }

   template<typename F,typename ... Arguments>
   void doWork(F func, Arguments... args)
       {
       if already busy
           return false;
       // set indication to runner that there is new work
       // here: how to pass f and args to runner?
       }

private:
   void runner()
        {
        while ( ! time to quit )
            {
            wait for work
            // here: how to get f and args from doWork? do I really need a queue? could wait on a variadic signal maybe?
            f(args);
            }
        }

   boost::thread* m_thread;
};

class ThreadPool
{
public:
    template<typename F, typename ... Arguments>
    bool   doWork(F func,Arguments... args)
          {
          const int i = findAvailableWorkThread();
          m_thread[i].doWork(f,args);
          }
private:
   // a pool of work threads m_thread;
};

【问题讨论】:

    标签: c++ multithreading boost-thread variadic-templates


    【解决方案1】:

    应该有很多现有的问题来说明如何做到这一点。

    表示任意函数对象的规范 C++11 方法是 std::function&lt;void()&gt;,因此您需要该类型的共享对象,在下面的代码中称为 m_job,它应该受互斥体保护,并且您在尚未设置时为其分配新工作:

    template<typename F,typename ... Arguments>
    bool doWork(F func, Arguments&&... args)
    {
        std::lock_guard<std::mutex> l(m_job_mutex);
        if (m_job)
            return false;
        m_job = std::bind(func, std::forward<Arguments>(args)...);
        m_job_cond.notify_one();  
        return true;   
    }
    

    这使用std::bind 将函数对象及其参数转换为不带参数的函数对象。 std::bind 返回的可调用对象存储了func 和每个参数的副本,并在调用时调用func(args...)

    然后工人只是这样做:

    void runner()
    {
        while ( ! time to quit )
        {
            std::function<void()> job;
            {
                std::unique_lock<std::mutex> l(m_job_mutex);
                while (!m_job)
                    m_job_cond.wait(l);
                swap(job, m_job);
            }
            job();
        }
    }
    

    此代码不是线程安全的:

    template<typename F, typename ... Arguments>
        bool   doWork(F func,Arguments... args)
        {
            const int i = findAvailableWorkThread();
            m_thread[i].doWork(f,args);
        }
    

    findAvailableWorkThread 返回后,该线程可能会变得忙碌,因此下一行将失败。您应该检查可用性并在单个操作中传递新作业,例如

    template<typename F, typename ... Arguments>
        bool   doWork(F func,Arguments... args)
        {
            for (auto& t : m_thread)
                if (t.doWork(f,args))
                    return true;
            return false;
        }
    

    【讨论】:

      【解决方案2】:

      您可以使用boost::bind 或C++11 lambda 将所有函数统一为一个签名。因此,您将拥有一个带有循环函数的线程,并且在该循环内您将提取要执行的函数。 例如,您可以使用boost::lockfree::queue&lt;boost::function&lt;void()&gt;&gt; 来实现它。

      作为这个想法的一个例子,您可以使用以下内容:

      class TaskLoop
      {
      public:
          typedef std::function<void()> Task_t;
      public:
          TaskLoop():
              m_IsDone(false)
          {
              m_spThread.reset(new std::thread(&TaskLoop::_run, this));
          }
          ~TaskLoop()
          {
              Task_t task = [this](){m_IsDone = true;};
              postTask(task);
              m_spThread->join();
          }
          void postTask(const Task_t& Msg)
          {
              std::lock_guard<std::mutex> lock(m_Mutex);
              m_Tasks.push(Msg);
          }
          void wait() 
          {
              while(!m_Tasks.empty());
          }
      private:
          bool m_IsDone;
          std::unique_ptr<std::thread> m_spThread; 
          std::mutex m_Mutex;
          std::queue<Task_t> m_Tasks;
      private:
          void _run()
          {
              while(!m_IsDone)
              {
                  Task_t task;
                  m_Mutex.lock();
                  if(!m_Tasks.empty())
                  {
                      task = m_Tasks.front();
                      m_Tasks.pop();
                  }
                  m_Mutex.unlock();
                  if(task)
                      task();
              }
          }
      };
      
      void foo(const std::string& first, int second)
      {
          std::cout << first << second << "\n";
      }
      
      int main(int argc, char **argv)
      {
          TaskLoop loop;
          loop.postTask([]{foo("task", 0);});
          loop.wait();
          return 0;
      }
      

      示例不使用并发队列并且非常简单,因此您需要替换队列并使其适应您的要求

      【讨论】:

      • 不是真的:boost::function 不符合boost::lockfree::queue 的要求
      • 如何将函数、它的返回类型和 arg 打包到队列中?我已经看到提到为此使用元组,那么建议队列将 arg 包作为元组保存吗?我不知道该怎么做。可以显示吗?
      【解决方案3】:

      由于您已经在使用 boost,请参阅 http://think-async.com/Asio/Recipes 的线程池类,它使用 Boost.Asio 将工作函数排队,而无需锁定。

      【讨论】:

        【解决方案4】:

        // 这是我目前所拥有的似乎可行的方法......

        #include <boost/thread.hpp>
        #include <boost/thread/mutex.hpp>
        #include <boost/thread/condition.hpp>
        
        #include <queue>
        #include <atomic>
        
        class ThreadWorker
            {
            public:
                typedef boost::function<void()> Task_t;
                typedef std::unique_ptr<boost::thread>  UniqueThreadPtr_t;
        
                ThreadWorker()
                    : m_timeToQuit(false)
                    {
                    m_spThread = UniqueThreadPtr_t( new boost::thread(std::bind(&ThreadWorker::runner, this)));
                    }
        
                virtual ~ThreadWorker()
                    {
                    quit();
                    }
        
                void quit()
                    {
                    Task_t task = [this]() { m_timeToQuit = true; };
                    enqueue(task);
                    m_spThread->join();
                    }
        
                template<typename F,typename ...Args>
                void enqueue(F&& f, Args&&... args)
                    {
                    boost::mutex::scoped_lock lock(m_jobMutex);
                    m_Tasks.push(std::bind(std::forward<F>(f),std::forward<Args>(args)...));
                    m_newJob.notify_one();
                    }
        
            private:
                std::atomic<bool>       m_timeToQuit;
                UniqueThreadPtr_t       m_spThread;
                mutable boost::mutex    m_jobMutex;
                std::queue<Task_t>      m_Tasks;
                boost::condition        m_newJob;
        
            private:
                void runner()
                    {
                    while ( ! m_timeToQuit )
                        {
                        Task_t task;
                            {
                            boost::mutex::scoped_lock lock(m_jobMutex);
                            while ( ! task )
                                {
                                if ( m_timeToQuit )
                                    {
                                    break;
                                    }
                                if ( m_Tasks.empty() )
                                    {
                                    m_newJob.wait(lock);
                                    }
                                task = m_Tasks.front();
                                m_Tasks.pop();
                                }
                            }
                        if (task)
                            {
                            task();
                            }
                        }
                    }
            };
        

        【讨论】:

        • 如何添加等待结果的功能?我需要一个带有模板返回类型的新版本的 enqueue?
        • 可能类似于:code template::type> auto enqueue( F&& f, Args&&... args) -> std::future{...} 然后如何定义任务队列
        • 有人可以展示如何改变队列和入队来处理期货吗?我想不通。
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-08-16
        • 1970-01-01
        • 2012-11-26
        • 2018-04-30
        • 1970-01-01
        相关资源
        最近更新 更多