【问题标题】:ASIO: getting deadlock when using several strands and threads with one io_contextASIO:在一个 io_context 中使用多个链和线程时出现死锁
【发布时间】:2021-07-01 08:11:14
【问题描述】:

我正在 Windows 上使用 Mingw64 编译最新版本的 ASIO。

我有一个用于接受 tcp 连接的沙箱代码。我使用一个上下文,每个接受器一个链和一个套接字和 2 个线程(我在文档中读到,发布到两个不同的链中并不能保证并发调用)。由于某种原因,我在执行结束时遇到了死锁,我不知道为什么会发生这种情况。如果出现以下情况,则不会发生:

  • 我使用 1 个线程和一个公共上下文
  • 有时当我使用 1 个上下文和 2 个没有线程的线程时
  • 我使用 2 个不同的上下文和 2 个没有链的不同线程
  • std::future 同步和请求停止服务器之间经过一段时间
  • 有时当我明确地将acceptor.cancel() 发布给它的执行者时

如果我close接受者,也不会发生死锁。

我未能在文档中找到任何可能解释此类行为原因的相关信息。而且我不想忽略它,因为它可能会导致无法预料的问题。

这是我的沙盒代码:


#include <asio.hpp>
#include <iostream>
#include <sstream>
#include <functional>

constexpr const char localhost[] = "127.0.0.1";
constexpr unsigned short port = 12000;

void runContext(asio::io_context &io_context)
{
    std::string threadId{};
    std::stringstream ss;
    ss << std::this_thread::get_id();
    ss >> threadId;
    std::cout << std::string("New thread for asio context: ")
                 + threadId + "\n";
    std::cout.flush();

    io_context.run();

    std::cout << std::string("Stopping thread: ")
                 + threadId + "\n";
    std::cout.flush();
};

class server
{
public:

    template<typename Executor>
    explicit server(Executor &executor)
            : acceptor_(executor)
    {
        using asio::ip::tcp;
        auto endpoint = tcp::endpoint(asio::ip::make_address_v4(localhost),
                                      port);
        acceptor_.open(endpoint.protocol());
        acceptor_.set_option(tcp::acceptor::reuse_address(true));
        acceptor_.bind(endpoint);
        acceptor_.listen();
    }

    void startAccepting()
    {
        acceptor_.async_accept(
                [this](const asio::error_code &errorCode,
                       asio::ip::tcp::socket peer)
                {
                    if (!errorCode)
                    {
                        startAccepting();
//                        std::cout << "Connection accepted\n";
                    }
                    if (errorCode == asio::error::operation_aborted)
                    {
//                        std::cout << "Stopped accepting connections\n";
                        return;
                    }
                });
    }

    void startRejecting()
    {
        // TODO: how to reject?
    }

    void stop()
    {
        // asio::post(acceptor_.get_executor(), [this](){acceptor_.cancel();}); // this line fixes deadlock
        acceptor_.cancel();
        // acceptor_.close(); // this line also fixes deadlock
    }

private:
    asio::ip::tcp::acceptor acceptor_;
};

int main()
{
    setvbuf(stdout, NULL, _IONBF, 0);
    asio::io_context context;

    // run server
    auto serverStrand = asio::make_strand(context);
    server server{serverStrand};
    server.startAccepting();

    // run client
    auto clientStrand = asio::make_strand(context);
    asio::ip::tcp::socket socket{clientStrand};

    size_t attempts = 1;
    auto endpoint = asio::ip::tcp::endpoint(
            asio::ip::make_address_v4(localhost), port);

    std::future<void> res = socket.async_connect(endpoint, asio::use_future);

    std::future<void> runningContexts[] = {
            std::async(std::launch::async, runContext, std::ref(context)),
            std::async(std::launch::async, runContext, std::ref(context))
    };

    res.get();
    server.stop();
    std::cout << "Server has been requested to stop" << std::endl;

    return 0;
}

更新
根据sehe的回答,我遇到了死锁,因为当调用server.stop() 时,已经发布了成功接受的完成处理程序,但由于取消从未调用过,这导致上下文有待处理的工作,因此在结束(如果我理解正确的话)。 我仍然不明白的事情是:

  1. 服务器有一个单独的分支(根据规范)强制接受者的命令以非并发和先进先出的顺序被调用。没有提供执行程序的处理程序也必须在同一个线程中处理。文档中没有关于acceptor::cancel() 方法的线程安全性,尽管不同的acceptor 对象是安全。所以我假设它是线程安全的(在一个strand 内不可能有数据竞争)。 如果cancel 通过asio::post 显式发布到acceptor 的线程中,@sehe 的代码不会导致死锁。对于 500 次调用,没有死锁:
test 499
Awaiting client
New thread 3
New thread 2
Completed client
Server stopped
Accept: 127.0.0.1:14475
Accept: The I/O operation has been aborted because of either a thread exit or an application request.
Stopping thread: 2
Stopping thread: 3
Everyting shutdown

但是,如果我在同步之前删除打印代码和导致延迟的stop(),则很容易实现死锁:

PS C:\dev\builds\asio_connection_logic\Release-MinGW-w64\bin> for ($i = 0; $i -lt 500; $i++){
>> Write-Output "
>> test $i"
>> .\sb.sf_example.exe}

test 0
New thread 2
New thread 3
Server stopped
Accept: 127.0.0.1:15160
PS C:\dev\builds\asio_connection_logic\Release-MinGW-w64\bin>
PS C:\dev\builds\asio_connection_logic\Release-MinGW-w64\bin> for ($i = 0; $i -lt 500; $i++){
>> Write-Output "
>> test $i"
>> .\sb.sf_example.exe}

test 0
New thread 2New thread 3

Server stopped
Accept: 127.0.0.1:15174
PS C:\dev\builds\asio_connection_logic\Release-MinGW-w64\bin> ^C

所以,结论是不管你怎么调用acceptor.cancel(),都会死锁。

  1. 有没有办法避免acceptor死锁?

【问题讨论】:

  • 我必须编辑你的代码才能真正使用 boost asio。您是要标记独立的 Asio 吗?
  • @sehe 我使用了独立的 asio,但标记了 boost,因为有更多标签
  • 知道了。我不怪你。幸运的是,无论如何诊断都是一样的。

标签: c++ multithreading boost-asio asio


【解决方案1】:

我对代码做了一些抽脂,并添加了一些追踪:

Live On Wandbox

#include <boost/asio.hpp>
#include <iostream>
#include <functional>

constexpr unsigned short port = 12000;
namespace asio = boost::asio;
using boost::system::error_code;
using asio::ip::tcp;

void runContext(asio::io_context& io_context) {
    std::cout << "New thread " << std::this_thread::get_id() << std::endl;
    io_context.run();
    std::cout << "Stopping thread: " << std::this_thread::get_id() << std::endl;
}

class server {
  public:
    template <typename Executor>
    explicit server(Executor executor) : acceptor_(executor, {{}, port}) {
        acceptor_.set_option(tcp::acceptor::reuse_address(true));
    }

    void startAccepting() {
        acceptor_.listen();
        acceptLoop();
    }

    void stop()
    {
        //asio::post(acceptor_.get_executor(), [this]() {
            //acceptor_.cancel();
        //}); // this line fixes deadlock
        acceptor_.cancel();
        // acceptor_.close(); // this line also fixes deadlock
    }

  private:
    void acceptLoop() {
        acceptor_.async_accept([this](error_code errorCode, tcp::socket peer) {
            if (!errorCode) {
                std::cout << "Accept: " << peer.remote_endpoint() << std::endl;
                acceptLoop();
            } else {
                std::cout << "Accept: " << errorCode.message() << std::endl;
            }
        });
    }

    tcp::acceptor acceptor_;
};

int main() {
    setvbuf(stdout, NULL, _IONBF, 0);
    asio::io_context context;

    // run server
    server server{make_strand(context)};
    server.startAccepting();

    // run client
    tcp::socket socket{make_strand(context)};
    std::future<void> res = socket.async_connect({ {}, port}, asio::use_future);

    std::thread t1(runContext, std::ref(context));
    std::thread t2(runContext, std::ref(context));

    std::cout << "Awaiting client " << std::endl;

    res.get();

    std::cout << "Completed client" << std::endl;

    server.stop();

    std::cout << "Server stopped" << std::endl;

    t1.join();
    t2.join();
    std::cout << "Everyting shutdown" << std::endl;
}

正如您所看到的“正确”运行打印:

Awaiting client 
New thread 140712013797120
Accept: New thread 140712005404416
127.0.0.1:57500
Completed client
Server stopped
Accept: Operation canceled
Stopping thread: 140712013797120
Stopping thread: 140712005404416
Everyting shutdown

但是,会打印“错误运行”:

New thread 140544269350656
Awaiting client 
New thread 140544260957952
Completed client
Server stopped
Accept: 127.0.0.1:48580
^C

关键在这里:

Server stopped
Accept: 127.0.0.1:48580

取消发生在接受之前。这意味着有一场比赛,async_accept 的完成处理程序已经在飞行中,errorCode 没有失败。 (换句话说,async_connect 返回的时间比服务器能够处理其 async_accept 完成的时间早一点。)

确实,在链上发帖是解决问题的一种方法。这是因为任何正在运行的处理程序都将在取消之前运行,如果异步操作处于挂起状态,它将被取消。

注意:acceptor_.close() 的另一种方法会调用未定义的行为,因为 acceptor_ 本身存在数据竞争(这不是线程安全的)。


皮皮鬼

旁白:小毛病:

一个“问题”是std::launch::async。我不使用它。我认为它的行为在某种程度上是实现定义的,这使得它不是很有用。也许,请改用std::thread,因为这就是您所追求的,在这里。在最近的提升中,使用asio::thread_pool(2)

这里的答案提供了一些启示Why should I use std::async?

对已编辑问题的更新

你是对的,它具有相同的竞争条件 - 尽管没有 UB,所以这很好。

旁注:您可能应该停止称其为“死锁”,因为没有。这是一个软锁,你只是在等待永远不会发生的事情(async_connect)。死锁是指多方以永远无法满足的方式争夺锁。这只是一个软锁,因为连接甚至网络故障都将允许系统继续。

所以我也剥离了输出,但添加了 BOOST_ASIO_ENABLE_HANDLER_TRACKING。结果图片证实了上面的确切解释:

从这里看来,唯一明显的解决方案似乎是:

  1. 不要从客户端链中取消服务器。通过扩展这将意味着

    • 对客户端和服务器使用相同的链(在进程间通信中不可行)
    • 信号关闭内部连接,以便消息在服务器链上被接收。这是“使关闭命令成为协议的一部分”的方式。
  2. 发布close 而不是cancel,这会主动使任何async_accept 出错

  3. 或者,在服务器中有一个状态机,在启动一个新的async_accept之前手动检查我们是否仍然应该接受。

注意

  • 在使用本机句柄的代码中使用.close() 方法可能会导致其自己的竞争条件(另一个线程立即使用重用的文件描述符打开一个新文件/套接字,而本机代码没有注意到它正在说话到错误的插座)。老实说,这主要是流套接字(不是接受器)的问题,用.shutdown() 很容易解决,所以请注意。

  • 我在 ASIO 的大量生产使用中从未遇到过这个问题。我想,在实践中你的确切用例(.cancel() 完美地与新的accept 完成一起计时)并没有出现很多

  • 确实经常出现的一个类似用例是计时器,它也很难处理无竞争的情况。在那里,消除歧义的因素是也是额外的状态,形式为basic_waitable_timer::expiry()。参见例如Cancelling boost asio deadline timer safely

【讨论】:

  • 我在调用server.stop() 之前使用了未来同步。在连接被接受之前,future 怎么会返回?
  • 我使用 std::async 因为 RAII - 它在销毁时“加入”。而且我找不到在多个线程中启动的io_contextthread_pool 之间的明显区别(它是否隐式使用指定数量的线程?)。
  • strand 的生命周期如何?从您的帖子中,我认为如果只有一个对象使用它,则没有必要使其持久化。
  • 确实如此。在执行器模型中,执行器引用链执行器服务内部的共享链实现。同样,始终按值传递执行程序。
  • io_contextthread_pool 上:参见e.g. - 是的,它在n 个线程(默认或指定)上显式 运行它们。除非它会正确地这样做,例如面对处理程序异常:stackoverflow.com/a/44500924/85371
猜你喜欢
  • 2023-03-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-07-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多