【问题标题】:Thread pool using Boost in C++ not working correctly在 C++ 中使用 Boost 的线程池无法正常工作
【发布时间】:2014-03-05 06:58:39
【问题描述】:

我已经实现了来自 How to create a thread pool using boost in C++? 的解决方案,但是 io_service::stop() 函数停止进一步处理我的线程时遇到问题。

在我的情况下,我的池中有 3 个线程,并试图通过它运行大约 11000 条记录。每个记录都独立于其他记录,因此我只是希望通过创建每个记录的并行运行来加快处理速度。

void processRecord (unsigned int i, unsigned int numRecords)
{
    cout << i << "/" << numRecords << endl;

    // do Processing...
}

#define MAX_THREADS 3
unsigned int numRecords=11000

boost::asio::io_service ioService;
boost::thread_group threadPool;

boost::asio::io_service::work work (ioService);

for (unsigned int i=0 ; i<MAX_THREADS ; ++i)
{
    threadPool.create_thread (boost::bind (&boost::asio::io_service::run, &ioService));
}

for (unsigned int i=0 ; i<numRecords ; ++i)
{
    ioService.post (boost::bind (processRecord, i, numRecords);
}

// ioService.stop ();          // Was causing ioService to stop
work.reset();                  // Wait for all work to be finished.
threadPool.join_all ();

processAllRecords ();

我看到的问题是,在对 ioService.post() 的调用将进程推送到池中的循环完成后,它会触发 ioService.stop() 调用并停止所有进一步的处理。这通常发生在实际处理了大约 400 条记录之后。

因此,大约 11000 条记录中只有大约 400 条正在处理。

我不熟悉在 C++ 中使用线程,所以我不确定我缺少什么或如何解决这个问题。任何帮助将不胜感激。

编辑:我修改了上面的代码以反映我为使其工作所做的更改。本质上,ioService.stop() 调用导致所有进一步的处理停止。我用 work.wait() 替换了它,这样它就会等到所有工作完成。

Edit2:我在之前的编辑中使用了错误的函数。应该是 work.reset()。

【问题讨论】:

  • 在完成之前不要调用停止?
  • 我正要发布我在这里找到了解决方案:tonicebrian.com/2012/05/23/thread-pool-in-c。而不是调用 ioService.stop() 我应该调用 work.reset()。这将等待所有“工作”完成,然后我可以执行 join_all()。这解决了这个问题,现在我要追查另一个不相关的问题。
  • 如果您将答案发布给其他人以找到它(并投票!),这将是最有用的
  • 我已经修改了上面的代码以匹配我现在的工作。

标签: c++ multithreading boost


【解决方案1】:

使用您的代码,我将 boost::asio 用于线程组的方式, 将作品括在括号中并使用 scoped_ptr。只是一个想法。

void processRecord (unsigned int i, unsigned int numRecords)
{
    cout << i << "/" << numRecords << endl;

    // do Processing...
}

#define MAX_THREADS 3
unsigned int numRecords=11000

boost::asio::io_service ioService;
boost::thread_group threadPool;

// by using a scoped pointer for the io_service::work
// and enclosing the threading in brackets
// this should run until all the jobs have finished
// and you don't need to call work.reset()  

// added brackets around threading
{
    // made work a boost::scoped_ptr
    boost::scoped_ptr< boost::asio::io_service::work > 
          work ( new boost::asio::io_service(ioService) );

    for (unsigned int i=0 ; i<MAX_THREADS ; ++i)
    {
        threadPool.create_thread (
           boost::bind (&boost::asio::io_service::run, &ioService));
    }

    for (unsigned int i=0 ; i<numRecords ; ++i)
    {
        ioService.post (boost::bind (processRecord, i, numRecords);
    }
}
// now just have to join
threadPool.join_all ();

processAllRecords ();

【讨论】:

    猜你喜欢
    • 2020-10-15
    • 2016-02-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多