【问题标题】:How can I pass things to a C++20 coroutine co_await await_suspend operator function such as a thread pool reference?如何将事物传递给 C++20 协程 co_await await_suspend 运算符函数,例如线程池引用?
【发布时间】:2022-11-22 02:44:42
【问题描述】:

我正在尝试在 C++ 中一起使用协程和多线程。

在许多协程示例中,它们在 co_await 运算符的 await_suspend 中为 promise 类型创建了一个新线程。我想提交给这个函数中的线程池。

这里我为future<int>定义了一个co_await

void await_suspend(std::coroutine_handle<> handle) {
          this->wait();
          handle.resume();
        }

我想更改此代码以将 lambda/函数指针提交给线程池。潜在地我可以使用 Alexander Krizhanovsky 的 ringbuffer 与线程池通信以自己创建线程池或使用 boost 的线程池。

我的问题不是线程池。我的问题是我不知道如何在这个 co_await 运算符中获取对线程池的引用。

我如何将数据从操作员所在的外部环境传递给这个await_suspend函数?这是我想做的一个例子:

void await_suspend(std::coroutine_handle<> handle) {
    // how do I get "pool"? from within this function
    auto res = pool.enqueue([](int x) { 
          this->wait();
          handle.resume();
    });
          
}

我不是 C++ 专家,所以我不确定如何在此运算符中访问 pool

这是受this GitHub gist A simple C++ coroutine example 启发的完整代码。

#include <future>
#include <iostream>
#include <coroutine>
#include <type_traits>
#include <list>
#include <thread>

using namespace std;



template <>
struct std::coroutine_traits<std::future<int>> {
  struct promise_type : std::promise<int> {
    future<int> get_return_object() { return this->get_future(); }
    std::suspend_never initial_suspend() noexcept { return {}; }
    std::suspend_never final_suspend() noexcept { return {}; }
    void return_value(int value) { this->set_value(value); }
    void unhandled_exception() {
      this->set_exception(std::current_exception());
    }
  };
};

template <>
struct std::coroutine_traits<std::future<int>, int> {
  struct promise_type : std::promise<int> {
    future<int> get_return_object() { return this->get_future(); }
    std::suspend_never initial_suspend() noexcept { return {}; }
    std::suspend_never final_suspend() noexcept { return {}; }
    void return_value(int value) { this->set_value(value); }
    void unhandled_exception() {
      this->set_exception(std::current_exception());
    }
  };
};

auto operator co_await(std::future<int> future) {
  struct awaiter : std::future<int> {
  
    bool await_ready() { return false; } // suspend always
    void await_suspend(std::coroutine_handle<> handle) {
      this->wait();
      handle.resume();
    }
    int await_resume() { return this->get(); }
  };
  return awaiter{std::move(future)};
}

future<int> async_add(int a, int b)
{
    auto fut = std::async([=]() {
        int c = a + b;
        return c;
    });

    return fut;
}

future<int> async_fib(int n)
{
    if (n <= 2)
        co_return 1;

    int a = 1;
    int b = 1;

    // iterate computing fib(n)
    for (int i = 0; i < n - 2; ++i)
    {
        int c = co_await async_add(a, b);
        a = b;
        b = c;
    }

    co_return b;
}

future<int> test_async_fib()
{
    for (int i = 1; i < 10; ++i)
    {
        int ret = co_await async_fib(i);
        cout << "async_fib(" << i << ") returns " << ret << endl;
    }
}

int runfib(int arg) {
  auto fut = test_async_fib();
  fut.wait();
  return 0;
}

int run_thread() {
  printf("Running thread");
  return 0;
}
  
int main()
{
    std::list<shared_ptr<std::thread>> threads = { };
      
  
    for (int i = 0 ; i < 10; i++) {
      printf("Creating thread\n");
      std::shared_ptr<std::thread> thread = std::make_shared<std::thread>(runfib, 5);
      
      threads.push_back(thread);
      
    }
    std::list<shared_ptr<std::thread>>::iterator it;
    for (it = threads.begin(); it != threads.end(); it++) {
      (*it).get()->join();
      printf("Joining thread");
    }
    fflush(stdout);

    return 0;
}

【问题讨论】:

  • 如果您需要其他上下文,则必须以其他方式提供。你可以把它放在 promise 对象中,或者你可以把它放在等待者或适配器中。
  • 我可以在等待代码的区域中提供额外的上下文吗?我想在我的应用程序的 main() 中创建线程池。 promise_type 可以从函数语句中接收数据吗?

标签: c++ multithreading c++-coroutine


【解决方案1】:

您可以有一个线程池,并让协程承诺在其上安排工作。

我有这个例子,它并不十分简单,但可以完成工作:

  • 让您的协程返回 task&lt;T&gt;
task<int> async_add(int a, int b) { ... }
  • task与其coroutine_promise共享一个statestate
    • 被实现为可执行文件,执行时恢复协程,并且
    • 保存操作的结果(例如std::promise&lt;T&gt;)。
template <typename T>
class task<T>::state : public executable {
public:
    void execute() noexcept override {
        handle_.resume();
    }
...
private:
    handle_type handle_;
    std::promise<T> result_;
};
  • coroutine_promiseinitial_suspend 返回task_scheduler 服务员:
template <typename T>
class task<T>::coroutine_promise {
public:
    auto initial_suspend() {
        return task_scheduler<task<T>>{};
    }
  • task_scheduler服务员安排state
template <is_task task_t>
struct task_scheduler : public std::suspend_always {
    void await_suspend(task_t::handle_type handle) const noexcept {
        thread_pool::get_instance().schedule(handle.promise().get_state());
    }
};
  • 总结一下:调用协程将使 state 被安排在线程上,并且,每当线程执行该 state 时,协程将恢复。然后调用者可以等待任务的结果。
auto c{ async_add(a,b) };
b = c.get_result();

[Demo]

【讨论】:

    猜你喜欢
    • 2020-04-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-06-01
    • 1970-01-01
    相关资源
    最近更新 更多