【问题标题】:boost asio need to post n jobs only after m jobs have finishedboost asio 需要在 m 个工作完成后才发布 n 个工作
【发布时间】:2011-12-08 22:16:00
【问题描述】:

我正在寻找一种方法来等待多个作业完成,然后执行另一个完全不同数量的作业。当然,使用线程。简要说明: 我创建了两个工作线程,都在 io_service 上运行。下面的代码取自here

为了简单起见,我创建了两种类型的作业,CalculateFib i CalculateFib2。我希望 CalculateFib2 作业CalculateFib 作业完成之后开始。我尝试按照here 的说明使用条件变量,但如果CalculateFib2 作业不止一个,程序就会挂起。我做错了什么?

谢谢,dodol

#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>

boost::mutex global_stream_lock;
boost::mutex mx;
boost::condition_variable cv;

void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service)
{
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Thread Start" << std::endl;
    global_stream_lock.unlock();

    io_service->run();

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Thread Finish" << std::endl;
    global_stream_lock.unlock();
}

size_t fib( size_t n )
{
    if ( n <= 1 )
    {
        return n;
    }
    boost::this_thread::sleep( 
        boost::posix_time::milliseconds( 1000 )
        );
    return fib( n - 1 ) + fib( n - 2);
}

void CalculateFib( size_t n )
{
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Now calculating fib( " << n << " ) " << std::endl;
    global_stream_lock.unlock();

    size_t f = fib( n );

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] fib( " << n << " ) = " << f << std::endl;
    global_stream_lock.unlock();

    boost::lock_guard<boost::mutex> lk(mx);
    cv.notify_all();
}

void CalculateFib2( size_t n )
{
    boost::unique_lock<boost::mutex> lk(mx);
    cv.wait(lk);

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Now calculating fib2( " << n << " ) " << std::endl;
    global_stream_lock.unlock();

    size_t f = fib( n );

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] fib2( " << n << " ) = " << f << std::endl;
    global_stream_lock.unlock();
}
int main( int argc, char * argv[] )
{
    boost::shared_ptr< boost::asio::io_service > io_service(
        new boost::asio::io_service
        );
    boost::shared_ptr< boost::asio::io_service::work > work(
        new boost::asio::io_service::work( *io_service )
        );

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] The program will exit when all work has finished."
        << std::endl;
    global_stream_lock.unlock();

    boost::thread_group worker_threads;
    for( int x = 0; x < 2; ++x )
    {
        worker_threads.create_thread( 
            boost::bind( &WorkerThread, io_service )
            );
    }
    io_service->post( boost::bind( CalculateFib, 5 ) );
    io_service->post( boost::bind( CalculateFib, 4 ) );
    io_service->post( boost::bind( CalculateFib, 3 ) );

    io_service->post( boost::bind( CalculateFib2, 1 ) );
    io_service->post( boost::bind( CalculateFib2, 1 ) );
    work.reset();
    worker_threads.join_all();

    return 0;
}

【问题讨论】:

    标签: c++ multithreading boost boost-asio boost-thread


    【解决方案1】:

    CalculateFib2 中,您要做的第一件事就是等待条件 (cv)。此条件CalculateFib 的末尾发出信号。因此,执行从不继续,除非触发条件(通过发布CalculateFib)作业。

    确实,像这样添加任何其他行:

    io_service->post( boost::bind( CalculateFib, 5 ) );
    io_service->post( boost::bind( CalculateFib, 4 ) );
    io_service->post( boost::bind( CalculateFib, 3 ) );
    
    io_service->post( boost::bind( CalculateFib2, 1 ) );
    io_service->post( boost::bind( CalculateFib2, 1 ) );
    
    io_service->post( boost::bind( CalculateFib, 5 ) );   // <-- ADDED
    

    使执行运行完成。

    为了阐明更多信息:如果您像这样(及时)隔离 Fib2 批次

    io_service->post( boost::bind( CalculateFib, 5 ) );
    io_service->post( boost::bind( CalculateFib, 4 ) );
    io_service->post( boost::bind( CalculateFib, 3 ) );
    
    boost::this_thread::sleep(boost::posix_time::seconds( 10 ));
    io_service->post( boost::bind( CalculateFib2, 1 ) );
    io_service->post( boost::bind( CalculateFib2, 1 ) );
    io_service->post( boost::bind( CalculateFib2, 1 ) );
    io_service->post( boost::bind( CalculateFib2, 1 ) );
    io_service->post( boost::bind( CalculateFib2, 1 ) );
    io_service->post( boost::bind( CalculateFib2, 1 ) );
    

    无论线程数如何,所有 Fib2 作业都将始终阻塞,因为 Fib 作业在发布之前已全部退出。一个简单的

    io_service->post( boost::bind( CalculateFib, 1 ) );
    

    将解锁所有等待者(即只有等待线程的数量,即可用线程的数量 -1,因为 Fib() 作业也占用一个线程。现在有 (所有线程都被阻塞在 Fib2 中等待)


    老实说,我不明白您在日程安排方面想要达到的目标。我怀疑您应该仅在达到所需数量的项目时才监视作业队列并明确发布作业(“任务”)。这样您就可以 KISS 并获得一个非常灵活的工作安排界面。

    通常,对于线程组(池),您希望避免无限期地阻塞线程。这有可能使您的工作安排陷入僵局,否则可能会表现不佳。

    【讨论】:

    • 是的,我只想在其他一些工作完成后发布工作,因为它们取决于第一组工作创建的结果。就是不知道应该怎么做,也就是怎么知道job group什么时候结束。顺便说一句,谢谢您对上述主题的全面解释。
    • @dodol:你描述的意图可能仍然意味着许多不同的东西。但是,我会考虑简单地从“依赖作业”中发布后续作业(因此您可以将作业线程函数作为参数提供给第一个作业 - 这非常类似于 continuation passing style)
    • 我希望能够在多个线程上发布一组作业,然后当作业结束时,另一组作业也在多个线程上。这个循环循环了很多次,所以我认为最好的办法是在循环开始时创建 io_service 并创建线程,所以我不会在每次循环开始/结束时创建和销毁线程。问题是我不知道什么时候应该开始第二组工作,因为我不知道什么时候完成第一组工作。有什么想法吗?
    • 这个调用听起来像是一个信号量的案例,根据第一组中的作业数进行初始化。然后,当它达到零时,开始第二组。然而,并非所有操作系统都支持信号量计数的“窥探”。另外,我上面提到的 CPS 并不意味着你不能使用线程池。
    猜你喜欢
    • 1970-01-01
    • 2021-12-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-08-14
    • 1970-01-01
    • 2021-08-05
    相关资源
    最近更新 更多