【问题标题】:How to avoid data race with `asio::ip::tcp::iostream`?如何避免与 `asio::ip::tcp::iostream` 的数据竞争?
【发布时间】:2018-06-15 11:53:42
【问题描述】:

我的问题

使用两个线程通过asio::ip::tcp::iostream 发送和接收时如何避免数据竞争?

设计

我正在编写一个使用asio::ip::tcp::iostream 进行输入和输出的程序。该程序通过端口 5555 接受来自(远程)用户的命令,并通过同一 TCP 连接向用户发送消息。因为这些事件(从用户接收的命令或发送给用户的消息)是异步发生的,所以我有单独的发送和接收线程。

在这个玩具版本中,命令是“一”、“二”和“退出”。当然“退出”退出程序。其他命令什么都不做,任何无法识别的命令都会导致服务器关闭 TCP 连接。

传输的消息是简单的序列号消息,每秒发送一次。

在这个玩具版本和我正在尝试编写的真实代码中,传输和接收进程都使用阻塞 IO,因此似乎没有使用std::mutex 或其他同步的好方法机制。 (在我的尝试中,一个进程会抓取互斥体然后阻塞,这对这个不起作用。)

构建和测试

为了构建和测试它,我在 64 位 Linux 机器上使用 gcc 版本 7.2.1 和 valgrind 3.13。构建:

g++ -DASIO_STANDALONE -Wall -Wextra -pedantic -std=c++14 concurrent.cpp -o concurrent -lpthread

为了测试,我使用以下命令运行服务器:

valgrind --tool=helgrind --log-file=helgrind.txt ./concurrent 

然后我在另一个窗口中使用telnet 127.0.0.1 5555 来创建到服务器的连接。 helgrind 正确指出的是存在数据竞争,因为 runTxrunRx 都试图异步访问同一个流:

==16188== 线程 #1 在 0x1FFEFFF1CC 读取大小 1 期间可能存在数据竞争

==16188== 持有锁:无

...省略了更多行

并发.cpp

#include <asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>

class Console {
public:
    Console() :
        want_quit{false},
        want_reset{false}
    {}
    bool getQuitValue() const { return want_quit; }
    int run(std::istream *in, std::ostream *out);
    bool wantReset() const { return want_reset; }
private:
    int runTx(std::istream *in);
    int runRx(std::ostream *out);
    bool want_quit;
    bool want_reset;
};

int Console::runTx(std::istream *in) {
    static const std::array<std::string, 3> cmds{
        "quit", "one", "two", 
    };
    std::string command;
    while (!want_quit && !want_reset && *in >> command) {
        if (command == cmds.front()) {
            want_quit = true;
        }
        if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
            want_reset = true;
            std::cout << "unknown command [" << command << "]\n";
        } else {
            std::cout << command << '\n';
        }
    }
    return 0;
}

int Console::runRx(std::ostream *out) {
    for (int i=0; !(want_reset || want_quit); ++i) {
        (*out) << "This is message number " << i << '\n';
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        out->flush();
    }
    return 0;
}

int Console::run(std::istream *in, std::ostream *out) {
    want_reset = false;
    std::thread t1{&Console::runRx, this, out};
    int status = runTx(in);
    t1.join();
    return status;
}

int main()
{
    Console con;
    asio::io_service ios;
    // IPv4 address, port 5555
    asio::ip::tcp::acceptor acceptor(ios, 
            asio::ip::tcp::endpoint{asio::ip::tcp::v4(), 5555});
    while (!con.getQuitValue()) {
        asio::ip::tcp::iostream stream;
        acceptor.accept(*stream.rdbuf());
        con.run(&stream, &stream);
        if (con.wantReset()) {
            std::cout << "resetting\n";
        }
    }
}

【问题讨论】:

标签: c++ multithreading tcp c++14 boost-asio


【解决方案1】:

是的,您正在共享作为流底层的套接字,但没有同步

旁注,与布尔标志相同,可以通过更改轻松“修复”:

std::atomic_bool want_quit;
std::atomic_bool want_reset;

如何解决

说实话,我认为没有好的解决方案。您自己说过:操作是异步的,所以如果您尝试同步进行操作,您将遇到麻烦。

你可以试着想想黑客。如果我们基于相同的底层套接字(文件描述符)创建一个单独的流对象会怎样。这不会是非常简单的,因为这样的流不是 Asio 的一部分。

但我们可以使用 Boost Iostreams 破解一个:

#define BOOST_IOSTREAMS_USE_DEPRECATED
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream.hpp>

// .... later:

    // HACK: procure a _separate `ostream` to prevent the race, using the same fd
    namespace bio = boost::iostreams;
    bio::file_descriptor_sink fds(stream.rdbuf()->native_handle(), false); // close_on_exit flag is deprecated
    bio::stream<bio::file_descriptor_sink> hack_ostream(fds);

    con.run(stream, hack_ostream);

确实,这在没有竞争的情况下运行(在同一个套接字 are fine 上同时读取和写入,只要您不共享包装它们的非线程安全 Asio 对象)。

我推荐的是:

不要这样做。这是一个杂物。您使事情复杂化,显然是为了避免使用异步代码。我会咬紧牙关。

将 IO 机制从服务逻辑中剔除出来并没有太多工作量。您最终将摆脱随机限制(您可以考虑与多个客户端打交道,您可以完全不使用任何线程等)。

如果您想了解一些中间立场,请查看 stackful coroutines (http://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/spawn.html)

上市

仅供参考

注意,我进行了重构以消除对指针的需求。您没有转让所有权,因此可以参考。如果您不知道如何将引用传递给 bind/std::thread 构造函数,那么技巧就在 std::ref 中,您将看到。

[对于压力测试,我大大减少了延迟。]

Live On Coliru

#include <boost/asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>

class Console {
public:
    Console() :
        want_quit{false},
        want_reset{false}
    {}
    bool getQuitValue() const { return want_quit; }
    int run(std::istream &in, std::ostream &out);
    bool wantReset() const { return want_reset; }
private:
    int runTx(std::istream &in);
    int runRx(std::ostream &out);
    std::atomic_bool want_quit;
    std::atomic_bool want_reset;
};

int Console::runTx(std::istream &in) {
    static const std::array<std::string, 3> cmds{
        {"quit", "one", "two"}, 
    };
    std::string command;
    while (!want_quit && !want_reset && in >> command) {
        if (command == cmds.front()) {
            want_quit = true;
        }
        if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
            want_reset = true;
            std::cout << "unknown command [" << command << "]\n";
        } else {
            std::cout << command << '\n';
        }
    }
    return 0;
}

int Console::runRx(std::ostream &out) {
    for (int i=0; !(want_reset || want_quit); ++i) {
        out << "This is message number " << i << '\n';
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
        out.flush();
    }
    return 0;
}

int Console::run(std::istream &in, std::ostream &out) {
    want_reset = false;
    std::thread t1{&Console::runRx, this, std::ref(out)};
    int status = runTx(in);
    t1.join();
    return status;
}

#define BOOST_IOSTREAMS_USE_DEPRECATED
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream.hpp>

int main()
{
    Console con;
    boost::asio::io_service ios;

    // IPv4 address, port 5555
    boost::asio::ip::tcp::acceptor acceptor(ios, boost::asio::ip::tcp::endpoint{boost::asio::ip::tcp::v4(), 5555});

    while (!con.getQuitValue()) {
        boost::asio::ip::tcp::iostream stream;
        acceptor.accept(*stream.rdbuf());

        {
            // HACK: procure a _separate `ostream` to prevent the race, using the same fd
            namespace bio = boost::iostreams;
            bio::file_descriptor_sink fds(stream.rdbuf()->native_handle(), false); // close_on_exit flag is deprecated
            bio::stream<bio::file_descriptor_sink> hack_ostream(fds);

            con.run(stream, hack_ostream);
        }

        if (con.wantReset()) {
            std::cout << "resetting\n";
        }
    }
}

测试:

netcat localhost 5555 <<<quit
This is message number 0
This is message number 1
This is message number 2

commands=( one two one two one two one two one two one two one two three )
while sleep 0.1; do echo ${commands[$(($RANDOM%${#commands}))]}; done | (while netcat localhost 5555; do sleep 1; done)

无限期运行,偶尔会重置连接(当命令“三”已发送时)。

【讨论】:

  • "将 IO 机制从服务逻辑中分离出来并没有太多工作。" 我不明白你的意思或如何做到这一点。你能详细说明一下吗?
  • 当然。在您的代码中,不要将请求循环绑定到istream,而是让它从“任何地方”(插入simplest interface you could think of)接收请求消息,然后您可以随意用异步的东西替换IO实现而不“害怕”复杂性增加。它是隐藏的和分隔的。
  • 好的,将 iO 与处理分开是有道理的,但我不知道您在这种情况下所说的“异步代码”是什么意思。这是我对异步代码的尝试——毫无疑问,有更好的方法来做到这一点,但是如何呢?
  • @Edward 你痛苦地意识到你的代码没有什么是并发的,但不是异步的。这体现在 IO 操作阻塞的事实中。异步 IO 是 Asio 的祸根,您可以通过使用 async_* 版本的调用来获得它。 [这也意味着您将无法使用 iostream 抽象,尽管您可以使用std::[io]streamboost::asio::streambuf 仍然]
  • 好的,我想我会回到 asio 文档再次尝试理解它们。显然我仍然缺少一些基本的东西,但不知道它是什么。不过,我很感激你的时间。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-06-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多