【问题标题】:Boost::ASIO multithreaded writing stale data to socket?Boost::ASIO 多线程将陈旧数据写入套接字?
【发布时间】:2015-02-22 05:51:28
【问题描述】:

我目前正在使用一个小型 servlet 通过 TCP 发送模拟数据,使用 boost::asio 作为网络部分。我已经设法在我的机器上的两个进程之间进行通信(简单的客户端是用 Python 编写的)。问题在于相同的数据一直通过套接字发送,而不是被更新。

我正在使用两个线程:一个运行模拟,创建数据,并使用当前数据更新服务器的连接对象。第二个运行服务器,并且每隔一段时间将当前数据写入套接字。我在这里创建了一个与您分享的最小示例(它使用 MSVC++ 12.0 编译,如果您想复制,我正在谈论的问题)。

tcp_server * server;
bool connected = false;

void runServer() {
    try
    {
        boost::asio::io_service io_service;
        server = new tcp_server(io_service);

        connected = true;
        io_service.run();
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
}

void runSim() {
    for (int i = 0; i < 1000; i++) {
        if (connected)
            server->setData("Current Message: " + std::to_string(i));

        boost::this_thread::sleep(boost::posix_time::seconds(1));
    }
}

int _tmain(int argc, _TCHAR* argv[])
{
    boost::thread serverThread(runServer);
    boost::thread simThread(runSim);

    simThread.join();
    serverThread.join();

    return 0;
}

这里有两个类,TCP_Connection 和 TCP_Server。这些非常接近于现在在 boost 网站上的 boost::asio 教程中找到的那些。

class tcp_connection
    : public boost::enable_shared_from_this<tcp_connection>
{
public:
    typedef boost::shared_ptr<tcp_connection> pointer;

    static pointer create(boost::asio::io_service& io_service)
    {
        return pointer(new tcp_connection(io_service));
    }

    tcp::socket& socket()
    {
        return socket_;
    }

    void start()
    {
        message_ = make_daytime_string();

        boost::asio::async_write(socket_, boost::asio::buffer(message_),
            boost::bind(&tcp_connection::handle_write, shared_from_this()));
    }

    void setData(std::string msg) {
        boost::unique_lock<boost::shared_mutex> msgLock(msgMutex, boost::try_to_lock);
        if (msgLock.owns_lock()) {
            message_ = msg;
        }
    }

private:
    tcp_connection(boost::asio::io_service& io_service)
        : socket_(io_service)
    {
        timer_ = new boost::asio::deadline_timer(io_service,boost::posix_time::milliseconds(250));
    }

    void handle_write()
    {
        boost::shared_lock<boost::shared_mutex> msgLock(msgMutex);
        std::cout << "Writing to socket: " << message_ << std::endl;
        boost::asio::write(socket_, boost::asio::buffer(message_));     

        timer_->expires_at(timer_->expires_at() + boost::posix_time::milliseconds(1500));
        timer_->async_wait(boost::bind(&tcp_connection::handle_write, shared_from_this())); 
    }

    tcp::socket socket_;
    std::string message_;
    int counter_;
    boost::asio::deadline_timer * timer_;
    boost::shared_mutex msgMutex;
};

class tcp_server
{
public:
    tcp_server(boost::asio::io_service& io_service)
        : acceptor_(io_service, tcp::endpoint(tcp::v4(), 13))
    {
        start_accept();
    }

    void setData(std::string msg) {
        if (current_connection != NULL) {
            current_connection->setData(msg);
        }
    }

private:
    void start_accept()
    {
        tcp_connection::pointer new_connection =
            tcp_connection::create(acceptor_.get_io_service());

        acceptor_.async_accept(new_connection->socket(),
            boost::bind(&tcp_server::handle_accept, this, new_connection,
            boost::asio::placeholders::error));

        current_connection = new_connection;
    }

    void handle_accept(tcp_connection::pointer new_connection,
        const boost::system::error_code& error)
    {
        if (!error)
        {
            new_connection->start();
            std::cout << "New Connection on 127.0.0.1" << std::endl;
        }

        start_accept();
    }

    tcp::acceptor acceptor_;
    tcp_connection::pointer current_connection;
};

通过明智地使用 std::cout,我设法确定服务器线程正在从模拟线程获取数据,并且连接对象也正在传递(因为 setData() 方法正在在它应该调用的时候调用)。无论出于何种原因,连接的成员“message_”似乎没有更新。我也知道连接没有被重置或从“新连接”更新到控制台重新创建。

【问题讨论】:

  • 您似乎将同步 boost::asio::write() 与异步 boost::asio::async_write() 操作混合在一起。同样,async_wait() 的完成处理程序为什么要在套接字上写入另一个数据流?
  • 好吧,我这样做是因为这是我可以弄清楚如何在传输之间设置一个固定时间段的唯一方法(async_wait 位于具有指定时间段的截止时间计时器上)。我的想法是让写入重复发生,数据由另一个线程更新,这就是完成处理程序启动另一个写入并等待的原因。
  • 我应该补充一点,我只添加了使用阻塞 boost::asio::write() 的代码。 async_write() 来自我用作此应用程序基础的教程,如果需要或有理由仅使用其中一个,可以轻松更改为阻塞写入。
  • 我建议你重新考虑你的设计。它可以通过仅使用异步操作来实现。摆脱同步写入和互斥锁,改用链。与async_write 相比,async_wait 使用单独的完成处理程序。您需要maintain an outgoing message queue 以避免交错写入。您还应该检查提供给异步完成处理程序的错误参数。
  • 是的,您需要确保在给定套接字上最多有一个写操作未完成。队列就是这样发生的。

标签: multithreading sockets boost tcp boost-asio


【解决方案1】:

好的,山姆·米勒 (Sam Miller) 在这里得到了答案,但他将其发布为评论,所以我现在回答以关闭问题,因为我已经弄清楚了。最终,该错误很可能是交错写入调用和访问对象数据的问题。我已经重新编写了我的示例代码,只包含一个类(而不是上面的两个),使用 Sam 在他已经链接到的其他答案中提供的指南。我还使所有的写操作都是异步的。现在是代码:

#include <iostream>
#include <string>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <deque>

using boost::asio::ip::tcp;
using namespace std;

class tcp_server {
public:
    tcp_server(boost::asio::io_service& io_service)
        : _acceptor(io_service, tcp::endpoint(tcp::v4(), 5005)), _socket(io_service)
    {
        messages = std::deque<std::string> (1,"Hello from Jake's shitty server");

        timer_ = new boost::asio::deadline_timer(io_service, boost::posix_time::milliseconds(250));

        start_accept();
    }

    void write(std::string message) {
        boost::unique_lock<boost::shared_mutex> queueLock(queueMutex);
        messages.push_back(message);
        if (messages.size() <= 1)
            handle_write();
    }
private:
    void start_accept() {
        _acceptor.async_accept(_socket,
            boost::bind(&tcp_server::handle_accept, this,
            boost::asio::placeholders::error));
    }

    void handle_accept(boost::system::error_code e) {
        if (!messages.empty()) {

            _message = messages.front();
            messages.pop_front();

            boost::asio::async_write(_socket, boost::asio::buffer(_message),
                boost::bind(&tcp_server::handle_write, this));

        }
    }

    void handle_write() {

        if (!messages.empty()) {
            timer_->expires_at(timer_->expires_at() + boost::posix_time::milliseconds(1500));
            timer_->async_wait(boost::bind(&tcp_server::handle_accept, this, boost::asio::placeholders::error));
        }

        return;
    }

    std::string _message;
    std::deque<std::string> messages;

    tcp::acceptor _acceptor;
    tcp::socket _socket;
    boost::asio::deadline_timer * timer_;
    boost::shared_mutex queueMutex;


};


tcp_server * server;

void addMessages() {
    for (int i = 0; i < 10; i++) {
        server->write("New Message. Count: " + std::to_string(i) + ".\n");
    }
}


int _tmain(int argc, _TCHAR* argv[])
{
    boost::asio::io_service io_service;
    server = new tcp_server(io_service);

    server->write("Hey there sexy");
    boost::thread messenger(addMessages);

    io_service.run();

    return 0;
}

TL;DR 使用消息队列,不要混合异步/同步写入。

另外,我在工作时遇到的一个有趣的问题是,我正在使用从消息队列中弹出的临时字符串填充 boost::asio::buffer。这一直使 VS 2013 的调试断言失败,说字符串迭代器不可取消引用。一旦我将 _message 属性添加到类中,并使用它来构建缓冲区,一切都很好。在这里找到提示:Expression: string iterator not dereferencable while using boost regex。感谢山姆的帮助!

【讨论】:

  • 很高兴你知道了!
猜你喜欢
  • 2012-07-19
  • 2015-06-09
  • 1970-01-01
  • 1970-01-01
  • 2015-06-11
  • 1970-01-01
  • 2023-03-13
  • 2011-10-15
  • 2011-12-05
相关资源
最近更新 更多