【问题标题】:asio: how to pass object from one io context to anotherasio:如何将对象从一个 io 上下文传递到另一个
【发布时间】:2021-12-14 22:55:18
【问题描述】:

我试图更好地理解异步 asio 的工作原理。

我有以下代码,我在套接字上调用 async_read 以读取接下来的 10 个字节的数据。

struct SocketReader {
    void do_read_body()
    {
        asio::async_read(socket_,
            asio::buffer(msg_, 10),
            [this](asio::error_code ec, std::size_t length)
            {
                if (!ec)
                {
                    //messages_to_work_on.emplace_back(msg_); // <-- I'm trying to send this msg_ instance to another io_context
                    do_read_body(); // call again
                }
                else
                {
                    socket_.close();
                }
            });
    }
std::vector<uint8_t> msg_;
asio::tcp::socket _socket;
}

这些读取是在他自己的 std::thread 中运行的 io_context 内完成的,我在队列中收集从套接字读取的所有消息。到目前为止一切顺利。

我还有另一个“工人”类,它只是根据队列中可用的内容执行一些工作:

struct Worker
{
    asio::io_context& io_context_;
    std::deque< std::vector<uint8_t> > queue;
    Worker(asio::io_context& io_context)
        : io_context_(io_context) {
        asio::post(io_context_, [this]() {doWork();});
    }
    void doWork() {
        if (!queue.empty())
        {
            // do some work with front()
            queue.pop_front();
        }
        asio::post(io_context_, [this]() {doWork();});
    }
};

那个也在他自己的io_context中执行,在他自己的线程中运行。所以socket线程和worker线程之间是有并发的。

将从套接字接收的数据发布到工人类的正确方法是什么? 我在想我应该能够从套接字完成处理程序中调用,例如:

asio::post(worker_io_context, [this]() {worker.queue.push_back(msg_)});

这样,我至少可以确定没有同时使用工作队列。 但我不确定是否允许我从一个 io_context 发布到另一个,以及我是否不会以这种方式创建另一个竞争条件。 我也不太明白我的消息的内存应该放在哪里,尤其是从一个 io_context 到另一个的传输“中间”。是否需要按值传递消息(因为 this.msg_ 可以在执行后处理程序之前修改)?

谢谢!

【问题讨论】:

  • 我不明白为什么这么复杂 - 只需调用函数 doWork() 而不是“messages_to_work_on.emplace_back(msg_);”并且您将获得预期的行为,没有“双端队列”,没有竞争条件,没有头痛,没有两个 io_contexts,没有两个线程。
  • @Heto 关键是 do_work 可能正在进行长时间运行的计算,所以我不想阻塞正在读取套接字的线程(你建议这样做)。相反,我想在另一个线程(另一个 io_context)中发送数据,以便可以在不阻塞其余线程的情况下处理它

标签: c++ boost-asio


【解决方案1】:

我应该能够从套接字完成处理程序中调用, 类似:

asio::post(worker_io_context, [this]() {worker.queue.push_back(msg_)});

当然。

这样,我至少可以确定没有同时使用工作队列。 但我不确定我是否可以从一个 io_context 发布到另一个,

io_context 不是魔法。它们基本上是协作任务队列。

如果我不会以这种方式创建另一个竞争条件。

我不会坐在这里不看你的代码就做出裁决(我可能不想读完所有代码),但让我再说一遍:io_context 不是魔法。您可以根据线程、任务和资源以您已经知道的方式推理它们。

我也不太明白我的消息的内存应该放在哪里,尤其是从一个 io_context 到另一个的传输“中间”。是否需要我按值传递消息(因为 this.msg_ 可以在执行后处理程序之前修改)?

是的。确实。类似的东西

post(worker_io_context, [this, msg=std::move(msg_)]() {worker.queue.push_back(std::move(msg)); });

如果移动并不便宜,则可以选择使用引用计数的智能指针(如 shared_ptr)。如果您实际上在线程之间共享所有权,请考虑将其设为 smartpointer&lt;T const&gt;


淋浴思想:也许你可以不用“工人”队列。由于您正在转向反应器式异步(使用 Asio),因此您可能专注于对任务进行排队,而不是数据。不这样做的原因包括您想要优先排队、负载平衡/背压等。[原则上所有这些都可以使用自定义执行器来实现,但我会坚持在这样做之前我所知道的。]

【讨论】:

  • 谢谢!打字后我有点想通了,我打算用 asio 做的事情是可能的。我现在意识到的是,基本上完成处理程序不是“免费的”,如果参数按值传递,它们可以通过闭包附加大量数据。我还发现存在 think-async.com/Asio/asio-1.11.0/doc/asio/overview/cpp2011/… 可移动处理程序,以潜在地避免在 asio 代码中大量复制重型处理程序(不确定会发生多少)
  • 为了避免昂贵的处理程序,您可以将数据放在处理程序之外。当跨越上下文/执行者时,我想取消共享数据以避免同步问题。另一种典型的模式是“handler-is-operation-type”,其中操作类型可能包含所有相关状态的 unique_ptr/shared_ptr。
  • 我真的不知道如何将数据放在处理程序之外而不回到我最初遇到的同步问题。更糟糕的是,因为可以同时创建许多处理程序并处于挂起状态,所以我不能将我的数据作为全局变量或其他东西放在一个特定的地方传输。就我而言,这很简单,因为数据可以很便宜地移动,因为它都是 std::vectors
  • 它就像你想要的一样简单:shared_ptrs 做得很好,例如当然,shared_ptr 可能与高性能代码背道而驰,并且经常被过度使用,但它实际上非常适合这里的要求。参见例如我在gist.github.com/sehe/…(上下文:chat.stackoverflow.com/transcript/116940?m=53027486#53027486chat.stackoverflow.com/transcript/message/53042186#53042186)中实现了巨大的加速。但是,是的,这完全取决于数据流和访问模式。
【解决方案2】:

注意:您不需要额外的 io_context。
如果您必须进行长时间运行的计算,您可以编写自己的 async_xyz 函数并像其他异步函数一样使用它。这个想法是将工作发布到提升线程池以在那里进行计算,然后在工作完成时调用完成处理程序。这是一个使用 boost 线程池对密码进行耗时散列的示例。

template <boost::asio::completion_token_for<void (std::string)> CompletionToken>
auto
async_hash (boost::asio::thread_pool &pool, boost::asio::io_context &io_context, std::string const &password, CompletionToken &&token)
{
  return boost::asio::async_initiate<CompletionToken, void (std::string)> (
      [&] (auto completion_handler, std::string const &passwordToHash) {
        auto io_eq = boost::asio::prefer (io_context.get_executor (), boost::asio::execution::outstanding_work.tracked);
        boost::asio::post (pool, [&, io_eq = std::move (io_eq), completion_handler = std::move (completion_handler), passwordToHash] () mutable {
          auto hashedPw = pw_to_hash (passwordToHash);
          boost::asio::post (io_eq, [hashedPw = std::move (hashedPw), completion_handler = std::move (completion_handler)] () mutable { completion_handler (hashedPw); });
        });
      },
      token, password);
}

然后调用它:

auto hashedPw = co_await async_hash (pool, io_context, createAccountObject.password, boost::asio::use_awaitable);

看来你不使用协程 ts 所以我认为你必须做这样的事情

async_hash (pool, io_context, createAccountObject.password, /*your lambda here*/);

【讨论】:

  • 我认为 - 有效 - 关心的是分离不同线程池之间的负载。这是一种非常常见的设计模式,实现它的自然方法是让不同的执行上下文由不同的线程/线程池提供服务。否则你最终将无法控制优先级(工作拥塞可能会使 IO 任务饿死等)
  • 我认为可能会有这种情况发生。但是在使用协程 ts 的时候很难想象。我很想看到一个发生这种情况的例子。也许如果我们使用很多线程(当然是在具有很多内核的服务器上)并且除了发布到 io_context 之外它们什么都不做?我们也可以使用这个“boost.org/doc/libs/master/doc/html/boost_asio/reference/…”(我从未尝试过)并为上下文提供更多线程?
  • 使用协程 TS 时完全一样。它基本上只是语法糖。一旦你做了一些繁重的工作(同步),你可能会伤害处理程序的延迟。协程、回调样式、asio::spawn,都归结为同一件事:在执行上下文中安排任务。
  • Re:并发提示:这有助于在已知单线程访问的情况下进行一点优化。在实践中,通过减少默认情况下在所有 Asio IO 对象中发生的类型擦除(由于any_io_executor,请参见此示例chat.stackoverflow.com/transcript/230461?m=51875036#51875036),可以获得更高的增益跨度>
【解决方案3】:

您不需要多个 io_context,即使对于多线程应用程序,您也可以只使用一个 io_context。你想让你的 SocketReader 成为一个共享指针,每次发生读取时,添加一个计数。我假设接受器、套接字创建和 some_io_context.run() 部分已完成。我会这样做:

class SocketReader 
   : public std::enable_shared_from_this<SocketReader> // we need this! 
{
public:
    // Constructor
    SocketReader(io_context& ctx) 
    : ctx_(ctx)
    {
    }
    
    // read
    void do_read_body()
    {
        auto self(this->shared_from_this()); // we need this!
        asio::async_read(socket_,
            asio::buffer(msg_, 10),
            [this](asio::error_code ec, std::size_t length)
            {
                if (!ec)
                {
                    // later edit - @sehe is spot on -> is better to move it
                    asio::post(ctx_, [this, msg=std::move(msg_)]() { // do the long work });
                    do_read_body();
                }
                else
                {
                    socket_.shutdown(asio::tcp::socket::shutdown_send, ec);
                    socket_.close();
                }
            });
    }
    
private:
    
    io_context& ctx_; // l.e. ctx_ must be a reference
    vector<uint8_t> msg_;
    asio::tcp::socket _socket;
};

...
// somewhere in the code
auto s_reader = std::make_shared<SocketReader>(some_io_context);
s_reader->do_read_body();

【讨论】:

  • 好吧,如果我理解正确,如果我使用 2 个线程来运行这个 io_context,那么我的 lambda 和 do_read_body 可能会同时执行?
  • 你不需要两个线程。它们不是“同时”执行的,而是异步执行的。照顾它是 OS/asio 的工作。
  • 但是,如果我希望它们同时执行,我需要 2 个线程对吗?这是我最初的意图
  • 你不知道他们什么时候会被执行。您正在向操作系统请求为您处理此业务,您命名您的条件(处理程序)并且您不会阻止您的线程。当条件满足时,就会被执行。
  • 一些想法:你可以做一个 context_pool/thread_pool。要拥有两个(或更多)线程,每个线程都有自己的 io_context,一个用于创建套接字,一个(或更多,如果您觉得有用)来完成工作。无需保护您的数据(使用互斥量/信号量),因为您没有在线程之间共享任何数据。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-09-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-01-10
相关资源
最近更新 更多