【问题标题】:Can I use std::async without waiting for the future limitation?我可以在不等待未来限制的情况下使用 std::async 吗?
【发布时间】:2014-02-27 03:49:57
【问题描述】:

高级
我想在异步模式下调用一些没有返回值的函数,而无需等待它们完成。如果我使用 std::async 未来对象在任务结束之前不会破坏,这会使调用在我的情况下不同步。

示例

void sendMail(const std::string& address, const std::string& message)
{
    //sending the e-mail which takes some time...
}

myResonseType processRequest(args...)
{
    //Do some processing and valuate the address and the message...

    //Sending the e-mail async
    auto f = std::async(std::launch::async, sendMail, address, message);

    //returning the response ASAP to the client
    return myResponseType;

} //<-- I'm stuck here until the async call finish to allow f to be destructed.
  // gaining no benefit from the async call.

我的问题是

  1. 有没有办法克服这个限制?
  2. 如果 (1) 为否,我是否应该执行一次线程来获取那些“僵尸”期货并等待它们?
  3. (1) 和 (2) 都没有,除了自己构建线程池还有其他选择吗?

注意:
我宁愿不使用线程+分离选项(由@galop1n 建议),因为创建一个新线程有我希望避免的开销。使用 std::async 时(至少在 MSVC 上)正在使用内部线程池。

谢谢。

【问题讨论】:

  • cannot do that with async 设计的。
  • 我知道我做不到。我要问的是是否有人有一种简单的方法来扩展基本的 std::async 来做到这一点。如果不是,我应该采取什么方法来实现这一目标。 (可能根本不使用 std::async)。
  • 嗯,这回答了你的标题问题:) 如果你想要线程池,也许你可以明确地做到这一点,而不是依赖于实现细节。
  • 如果您需要发送多封电子邮件,请考虑使用异步代理库(VS 附带的 PPL 的一部分)。 msdn.microsoft.com/en-us/library/dd492627.aspx
  • 只需forksendMail 的另一个进程,然后忘记它。 :)

标签: c++ multithreading c++11 asynchronous stdasync


【解决方案1】:

您可以将未来移动到全局对象中,因此当本地未来的析构函数运行时,它不必等待异步线程完成。

std::vector<std::future<void>> pending_futures;

myResonseType processRequest(args...)
{
    //Do some processing and valuate the address and the message...

    //Sending the e-mail async
    auto f = std::async(std::launch::async, sendMail, address, message);

    // transfer the future's shared state to a longer-lived future
    pending_futures.push_back(std::move(f));

    //returning the response ASAP to the client
    return myResponseType;

}

注意如果异步线程引用了processRequest 函数中的任何局部变量,这是不安全的。

虽然使用 std::async(至少在 MSVC 上)使用的是内部线程池。

这实际上是不符合标准的,标准明确规定使用std::launch::async 运行的任务必须像在新线程中一样运行,因此任何线程局部变量不得从一个任务持续到另一个任务。不过这通常并不重要。

【讨论】:

  • 这是一个糟糕的方法,我最终会得到一个不断增长的向量 (pending_futuers)
  • 所以定期检查向量并删除准备好的期货。您可以将其添加到 processRequest 函数中,因此每次调用它时,您都会查看是否有任何现成的期货可以从向量中删除。这并不复杂。
  • 你的问题是如何避免在 future 析构函数中等待,我回答了。如果您想创建自己的线程池,那很好(尽管我怀疑您的线程池与 Windows 运行时中的线程池一样高效),但这不会改变您最初的要求。
  • 如果在类方法中调用了std::async,你可以将它返回的future赋值给类成员。这种等待未来的方式将被推迟到类被破坏。
  • @starfury 不,你不能,那不会编译。 f 是一个左值,如果不把它变成一个 rvaue,你就不能从它构建另一个未来。
【解决方案2】:

如果你不关心加入,为什么不直接启动一个线程并分离?

std::thread{ sendMail, address, message}.detach();   

std::async 绑定到它返回的 std::future 的生命周期,并且它们无法替代。

将 std::future 放入由其他线程读取的等待队列将需要与接收新任务的池相同的安全机制,例如容器周围的互斥锁。

那么,您最好的选择是使用线程池来使用直接推送到线程安全队列中的任务。而且它不依赖于具体的实现。

在采用任何可调用和参数的线程池实现之下,线程在队列上进行轮询,更好的实现应该使用条件变量 (coliru):

#include <iostream>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <functional>
#include <string>

struct ThreadPool {
    struct Task {
        virtual void Run() const = 0;
        virtual ~Task() {};
    };   

    template < typename task_, typename... args_ >
    struct RealTask : public Task {
        RealTask( task_&& task, args_&&... args ) : fun_( std::bind( std::forward<task_>(task), std::forward<args_>(args)... ) ) {}
        void Run() const override {
            fun_();
        }
    private:
        decltype( std::bind(std::declval<task_>(), std::declval<args_>()... ) ) fun_;
    };

    template < typename task_, typename... args_ >
    void AddTask( task_&& task, args_&&... args ) {
        auto lock = std::unique_lock<std::mutex>{mtx_};
        using FinalTask = RealTask<task_, args_... >;
        q_.push( std::unique_ptr<Task>( new FinalTask( std::forward<task_>(task), std::forward<args_>(args)... ) ) );
    }

    ThreadPool() {
        for( auto & t : pool_ )
            t = std::thread( [=] {
                while ( true ) {
                    std::unique_ptr<Task> task;
                    {
                        auto lock = std::unique_lock<std::mutex>{mtx_};
                        if ( q_.empty() && stop_ ) 
                            break;
                        if ( q_.empty() )
                            continue;
                        task = std::move(q_.front());
                        q_.pop();
                    }
                    if (task)
                        task->Run();
                }
            } );
    }
    ~ThreadPool() {
        {
            auto lock = std::unique_lock<std::mutex>{mtx_};
            stop_ = true;
        }
        for( auto & t : pool_ )
            t.join();
    }
private:
    std::queue<std::unique_ptr<Task>> q_;
    std::thread pool_[8]; 
    std::mutex mtx_;
    volatile bool stop_ {};
};

void foo( int a, int b ) {
    std::cout << a << "." << b;
}
void bar( std::string const & s) {
    std::cout << s;
}

int main() {
    ThreadPool pool;
    for( int i{}; i!=42; ++i ) {
        pool.AddTask( foo, 3, 14 );    
        pool.AddTask( bar, " - " );    
    }
}

【讨论】:

  • std::async 使用 MSVC 编译时使用内部线程池。每次自己创建一个线程都会产生我希望避免的性能开销。
  • 程序存在数据竞争:ThreadPool~ThreadPool 可能同时访问 stop_volatile 没有用于多线程的有用(可移植)语义:它必须是 std::atomic~ThreadPool 需要在保持 mtx_ 的情况下访问它。您的线程也忙着等待,最好在队列为空时阻塞条件变量。
  • @Casey 这只是可变任务队列的概念证明,我试图让它尽可能简单。另外,我从来没有认真尝试过 c++11 条件变量(只有本地变量),也不想错过任何东西,事实上,我此时使用该示例来测试它们。我添加了一条关于在实际用例中使用条件变量的说明。
  • 您在ThreadPool 中的线程忙于自旋浪费CPU。了解如何使用条件变量。
  • 我个人不太喜欢微软对 std::async 的非一致性实现。我为一位同事对此进行了调查,第一次调用 std::async 时只有开销。在第一次调用时,所有线程都被初始化并至少使用 VS2010 进入等待状态
【解决方案3】:

与其将未来移动到一个全局对象(并手动管理删除未使用的未来),您实际上可以将它移动到异步调用函数的本地范围 .

“让异步函数拥有自己的未来”,可以这么说。

我想出了这个适合我的模板包装器(在 Windows 上测试):

#include <future>

template<class Function, class... Args>
void async_wrapper(Function&& f, Args&&... args, std::future<void>& future,
                   std::future<void>&& is_valid, std::promise<void>&& is_moved) {
    is_valid.wait(); // Wait until the return value of std::async is written to "future"
    auto our_future = std::move(future); // Move "future" to a local variable
    is_moved.set_value(); // Only now we can leave void_async in the main thread

    // This is also used by std::async so that member function pointers work transparently
    auto functor = std::bind(f, std::forward<Args>(args)...);
    functor();
}

template<class Function, class... Args> // This is what you call instead of std::async
void void_async(Function&& f, Args&&... args) {
    std::future<void> future; // This is for std::async return value
    // This is for our synchronization of moving "future" between threads
    std::promise<void> valid;
    std::promise<void> is_moved;
    auto valid_future = valid.get_future();
    auto moved_future = is_moved.get_future();

    // Here we pass "future" as a reference, so that async_wrapper
    // can later work with std::async's return value
    future = std::async(
        async_wrapper<Function, Args...>,
        std::forward<Function>(f), std::forward<Args>(args)...,
        std::ref(future), std::move(valid_future), std::move(is_moved)
    );
    valid.set_value(); // Unblock async_wrapper waiting for "future" to become valid
    moved_future.wait(); // Wait for "future" to actually be moved
}

我有点惊讶它的工作原理,因为我认为移动的未来的析构函数会阻塞,直到我们离开 async_wrapper。它应该等待 async_wrapper 返回,但它正在那个函数内部等待。从逻辑上讲,它应该是一个死锁,但事实并非如此。

我还尝试在async_wrapper的末尾添加一行来手动清空future对象:

our_future = std::future<void>();

这也不会阻塞。

【讨论】:

  • 将作业入队的线程将在moved_future.wait() 处阻塞,直到线程池启动此异步作业。如果它忙于处理以前排队的工作,这个摊位将很重要。
  • 在 windows 上工作,但不幸的是不能用 NDK 编译
【解决方案4】:

您需要将future 设为指针。以下正是您要查找的内容:

std::make_unique<std::future<void>*>(new auto(std::async(std::launch::async, sendMail, address, message))).reset();

Live example

【讨论】:

  • 感谢您的想法,但它不起作用。 static variables are only visible to the block they are defined in 是真的,但从某种意义上说,它们也是单例,对这个函数的两次调用将使用相同的 f,这将使第二次调用在这条线上被阻塞,直到第一次调用 future 回来。
  • @RoeeGavirel 我刚刚编辑了我的答案。没有更多的新代码块。玩得开心..
  • 自从我用 C/C++ 编程的日子以来,我早就离开了,但它不会留下一个悬空指针并导致内存泄漏。
  • @RoeeGavirel unique_ptr 是一个智能指针,它提供了自动内存管理的附加功能。此外,我在创建 unique_ptr 后立即通过reset(); 销毁对象,因此绝对不可能导致内存泄漏。
【解决方案5】:

我不知道我在做什么,但这似乎有效:

// :( http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2012/n3451.pdf
template<typename T>
void noget(T&& in)
{
    static std::mutex vmut;
    static std::vector<T> vec;
    static std::thread getter;
    static std::mutex single_getter;
    if (single_getter.try_lock())
    {
        getter = std::thread([&]()->void
        {
            size_t size;
            for(;;)
            {
                do
                {
                    vmut.lock();
                    size=vec.size();
                    if(size>0)
                    {
                        T target=std::move(vec[size-1]);
                        vec.pop_back();
                        vmut.unlock();
                        // cerr << "getting!" << endl;
                        target.get();
                    }
                    else
                    {
                        vmut.unlock();
                    }
                }while(size>0);
                // ¯\_(ツ)_/¯
                std::this_thread::sleep_for(std::chrono::milliseconds(100));
            }
        });
        getter.detach();
    }
    vmut.lock();
    vec.push_back(std::move(in));
    vmut.unlock();
}

它为你扔给它的每种类型的未来创建一个专用的 getter 线程(例如,如果你给它一个未来和未来,你将有 2 个线程。如果你给它 100 倍未来,你仍然只有 2 个线程),当有一个你不想处理的未来时,只需执行notget(fut); - 你也可以noget(std::async([]()-&gt;void{...})); 工作得很好,似乎没有阻塞。警告,not 在使用 noget() 之后尝试从未来获取值。那可能是 UB 和自找麻烦。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-07-21
    • 2012-06-13
    • 2017-09-30
    • 2020-02-06
    • 2016-01-25
    相关资源
    最近更新 更多