【问题标题】:What is the correct way of constructing multiple threads to get best performance?构建多个线程以获得最佳性能的正确方法是什么?
【发布时间】:2020-04-04 13:27:51
【问题描述】:

我想知道在使用 lambda 表达式来定义线程时,它是否有助于提高一些性能增益。就我而言,我必须运行多个线程。这是基于实时的应用程序。因此,如果有人向我建议创建多个线程的最佳方式是什么。创建线程发生在实际代码库的每次迭代中。这是高级迭代中发生的事情的一个示例。因此,这是一种需要优化的昂贵操作。

  #include <iostream>
  #include <thread>
  #include <vector>
  #include <algorithm>

  class Task
  {
    public:
    void execute(std::string command)
    {
      //TODO actual logic
      for(int i = 0; i < 5; i++)
      {
        std::cout<<command<<std::endl;
      }
    }
  };

  int main()
  {          
      Task* taskPtr = new Task();
      std::vector<std::thread> workers_older;
      for (int i = 0; i < 2; i++) {
          workers_older.push_back(std::thread(&Task::execute, taskPtr, "Task: without lambda expression"+ std::to_string(i)));
      }
      std::for_each(workers_older.begin(), workers_older.end(), [](std::thread &t) 
      {
          t.join();
      });

      std::vector<std::thread> workers;
      for (int i = 0; i < 2; i++) {
          workers.push_back(std::thread([&]() 
          {
              taskPtr->execute("Task: "+ std::to_string(i));
          }));
      }
      std::for_each(workers.begin(), workers.end(), [](std::thread &t) 
      {
          t.join();
      });
      return 0;
  }

编辑: 在关于要做什么的宝贵意见之后,我已经按照他们的建议提供了答案

【问题讨论】:

  • 如果 lambda 有所作为,我会感到非常惊讶。但我鼓励你衡量它。一种解决方案是拥有一个工人池,而不是每次都产生一个线程。然而,性能增益(或损失)完全取决于你真正在做什么。原因很明显,在您向我们展示的代码中,使用池会降低性能(因为您只有 4 个任务要处理)。
  • 很难给你一个明确的答案,平台会影响结果,我建议你在profiler中尝试不同的解决方案。话虽如此,我会假设启动线程、上下文切换和其他线程内容将完全超过使用 lambda 或将参数直接发送到 std::thread 构造函数的差异。
  • 每次迭代都会创建线程?这些迭代在哪里?
  • 由于您担心线程创建的成本(您这样做是对的!),您可能希望重用线程。也许您正在寻找类似线程池的东西stackoverflow.com/questions/26516683/reusing-thread-in-loop-c
  • OT:为什么不简单地for (auto&amp; t : workers) t.join();?另外,return 0;main 中是多余的。

标签: c++ multithreading performance c++11


【解决方案1】:

非常感谢大家提出的所有非常宝贵的想法。我决定为该任务使用线程池。对不起,关于没有解释整个逻辑很长,我认为这不是必需的。

这是我提出的解决方案。我从here 获得了初始代码,并按照我想要的方式进行了修改。

    #include <iostream>
    #include <unistd.h>
    #include <iostream>
    #include <thread>
    #include <vector>
    #include <algorithm>
    #include <boost/shared_ptr.hpp>
    #include <boost/make_shared.hpp>

    #include <boost/thread.hpp>
    #include <boost/bind.hpp>
    #include <boost/asio.hpp>
    #include <boost/move/move.hpp>
    #include <boost/make_unique.hpp>

    namespace asio = boost::asio; 

    typedef boost::packaged_task<int> task_t;
    typedef boost::shared_ptr<task_t> ptask_t;

    class Task
    {
    public:
    int execute(std::string command)
    {
      //TODO actual logic
      std::cout<< "\nThread:" << command << std::endl;
      int sum = 0;
      for(int i = 0; i < 5; i++)
      {
        sum+=i;
      }
      return sum;
    }
  };


    void push_job(Task* worker, std::string seconds, boost::asio::io_service& io_service
                , std::vector<boost::shared_future<int> >& pending_data) {
      ptask_t task = boost::make_shared<task_t>(boost::bind(&Task::execute, worker, seconds));
      boost::shared_future<int> fut(task->get_future());
      pending_data.push_back(fut);
      io_service.post(boost::bind(&task_t::operator(), task));
    }

    int main()
    {
        Task* taskPtr = new Task();

        boost::asio::io_service io_service;
        boost::thread_group threads;
        std::unique_ptr<boost::asio::io_service::work> service_work;
        service_work = boost::make_unique<boost::asio::io_service::work>(io_service);
        for (int i = 0; i < boost::thread::hardware_concurrency() ; ++i)
        {
          threads.create_thread(boost::bind(&boost::asio::io_service::run,
            &io_service));
        }
        std::vector<boost::shared_future<int> > pending_data; // vector of futures

        push_job(taskPtr, "4", io_service, pending_data);
        push_job(taskPtr, "5", io_service, pending_data);
        push_job(taskPtr, "6", io_service, pending_data);
        push_job(taskPtr, "7", io_service, pending_data);

        boost::wait_for_all(pending_data.begin(), pending_data.end());
        int total_sum = 0;
        for(auto result : pending_data){
           total_sum += result.get();
        }
        std::cout<< "Total sum: "<< total_sum << std::endl;
        return 0;
    }

【讨论】:

    【解决方案2】:

    使用线程时最大的开销来自启动线程、调度、上下文切换和缓存利用。与此相比,额外间接函数指针的开销可以忽略不计。

    为了获得最佳性能,请记住以下几点:

    • 保留一个包含 N 个线程的池,其中 N = std::thread::hardware_concurrency()(系统中的逻辑处理器数)
    • 将 N-1 个作业提交到池中,并在调用线程中运行第 N 个作业。不将第 N 个作业提交到池中可以节省大量资金
    • 避免虚假共享。不同线程写入的数据应该在不同的缓存行中
    • 更多活动线程通常意味着更大的工作集。因此 D-cache 利用率可能会降低,从而影响性能

    这是我的工作示例:

    #include <iostream>
    #include <memory>
    #include <thread>
    #include <vector>
    #include <boost/asio.hpp>
    
    struct thread_pool {
        thread_pool(int threads = std::thread::hardware_concurrency()) : size(threads) {
            grp.reserve(threads);
            for (int i = 0; i < threads; ++i)
                grp.emplace_back([this] { return service.run(); });
        }
    
        template<typename F, typename ...Args>
        auto enqueue(F& f, Args... args) -> std::future<decltype(f(args...))> {
            return boost::asio::post(service,
                std::packaged_task<decltype(f(args...))()>([&f, args...]{ return f(args...); })
            );
        }
    
        ~thread_pool() {
            service_work.reset();
            for (auto &t : grp)
                if (t.joinable())
                    t.join();
            service.stop();
        }
    
        const int size;
    private:
        boost::asio::io_service service;
        std::unique_ptr<boost::asio::io_service::work> service_work {new boost::asio::io_service::work(service)};
        std::vector<std::thread> grp;
    };
    
    int main() {
        thread_pool pool;
        std::vector<std::future<int>> results;
        auto task = [](int i) { return i + 1; };
        for (int i = 0; i < pool.size - 1; i++) {
            results.emplace_back(pool.enqueue(task, i));
        }
        int sum = task(pool.size - 1); // last task run synchronously
        for (auto& res : results) {
            sum += res.get();
        }
        std::cout << sum << std::endl;
    }
    

    【讨论】:

    • 感谢您的宝贵意见,但我猜此代码仅适用于 c+14 以后?但是,我提供的答案也适用于 c++11。
    • @GPrathap 只有 make_unique 是 C++14。我更新了q。所以它现在应该适用于 C++11。
    【解决方案3】:

    将成员函数的地址和一组参数传递给 std::thread 构造函数与传递带有适当捕获的 lambda 函数之间的开销差别很小。

    std::thread 构造函数中的大开销实际上是启动线程本身。

    如果您知道要在程序中的多个位置使用相同数量的工作线程,那么将它们作为具有任务队列的长时间运行线程保留可能是值得的。

    【讨论】:

    • 是的,我知道每次迭代要创建的线程数。因此,我将使用线程池。
    猜你喜欢
    • 2017-08-12
    • 2019-02-26
    • 2016-12-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-01-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多