【问题标题】:How to use boost::beast, download a file no blocking and with responses如何使用 boost::beast,下载无阻塞且有响应的文件
【发布时间】:2020-11-29 11:05:38
【问题描述】:

我从this example 开始,所以不会发布所有代码。我的目标是下载一个大文件而不阻塞我的主线程。第二个目标是获取通知,以便我可以更新进度条。我确实有几种方式工作的代码。首先是ioc.run(); 让它开始工作,我下载了文件。但是我无论如何都找不到在不阻塞的情况下启动会话。

第二种方式我可以拨打http::async_read_some 并且呼叫有效,但我无法得到我可以使用的响应。我不知道是否有办法传递捕获的 lambda。

#if 0..#else..#endif 切换方法。我敢肯定有一个简单的方法,但我就是看不到它。当我开始工作时,我会清理代码,比如设置本地文件名。谢谢。

    std::size_t on_read_some(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        if (ec);//deal with it... 
        if (!bValidConnection) {
            std::string_view view((const char*)buffer_.data().data(), bytes_transferred);
            auto pos = view.find("Content-Length:");
            if (pos == std::string_view::npos)
                ;//error
            file_size = std::stoi(view.substr(pos+sizeof("Content-Length:")).data());
            if (!file_size)
                ;//error
            bValidConnection = true;
        }
        else {
            file_pos += bytes_transferred;
            response_call(ec, file_pos);
        }
#if 0
        std::cout << "in on_read_some caller\n";
        http::async_read_some(stream_, buffer_, file_parser_, std::bind(
            response_call,
            std::placeholders::_1,
            std::placeholders::_2));
#else
        std::cout << "in on_read_some inner\n";
        http::async_read_some(stream_, buffer_, file_parser_, std::bind(
            &session::on_read_some,
            shared_from_this(),
            std::placeholders::_1,
            std::placeholders::_2));
#endif
        return buffer_.size();
    }

主要的,凌乱的但是.....

struct lambda_type {
    bool bDone = false;
    void operator ()(const boost::system::error_code ec, std::size_t bytes_transferred) {
        ;
    }
};
int main(int argc, char** argv)
{
    auto const host = "reserveanalyst.com";
    auto const port = "443";
    auto const target = "/downloads/demo.msi";
    int version = argc == 5 && !std::strcmp("1.0", argv[4]) ? 10 : 11;

    boost::asio::io_context ioc;
    ssl::context ctx{ ssl::context::sslv23_client };

    load_root_certificates(ctx);
    //ctx.load_verify_file("ca.pem");

    auto so = std::make_shared<session>(ioc, ctx);
    so->run(host, port, target, version);

    bool bDone = false;
    auto const lambda = [](const boost::system::error_code ec, std::size_t bytes_transferred) {
        std::cout << "data lambda bytes: " << bytes_transferred << " er: " << ec.message() << std::endl;
    };

    lambda_type lambda2;
    so->set_response_call(lambda);
    ioc.run();

    std::cout << "not in ioc.run()!!!!!!!!" << std::endl;

    so->async_read_some(lambda);

    //pseudo message pump when working.........
    for (;;) {
        std::this_thread::sleep_for(250ms);
        std::cout << "time" << std::endl;
    }
    return EXIT_SUCCESS;
}

还有我添加到class session的东西

class session : public std::enable_shared_from_this<session>
{
        using response_call_type = void(*)(boost::system::error_code ec, std::size_t bytes_transferred);
        http::response_parser<http::file_body> file_parser_;
        response_call_type response_call;
        //
        bool bValidConnection = false;
        std::size_t file_pos = 0;
        std::size_t file_size = 0;
    
    public:
        auto& get_result() { return res_; }
        auto& get_buffer() { return buffer_; }
        void set_response_call(response_call_type the_call) { response_call = the_call; }

【问题讨论】:

  • 我通常会在发布问题之前弄清楚。这一次似乎我想出了 a 解决方案,即使它似乎并不优雅。表示使用全局变量。
  • 我不知道你为什么要写lambda_type。或者为什么你的 lambda 不是 [&amp;]
  • 另外,您发布了代码,但您没有描述您发布的代码会发生什么。
  • 嗨@Yakk - Adam Nevraumont,lambda_type 只是测试的一部分。如果我在 main 中使用 lambda 捕获,它将无法编译。发布的代码编译并运行良好。万一它下载整个文件。在第二种情况下,我可以获得文件的第一部分,但没有得到继续的响应。这是前两段。
  • 更具体地说明什么不能编译。将[&amp;] 添加到[] 会使某些内容无法编译?

标签: c++ boost asio beast


【解决方案1】:

我强烈建议不要使用低级别的[async_]read_some 函数,而不是像使用http::response_parser&lt;http::buffer_body&gt; 那样使用http::[async_]read

我确实有一个例子 - 这有点复杂,因为它还使用 Boost Process 来同时解压缩正文数据,但无论如何它应该向您展示如何使用它:

How to read data from Internet using muli-threading with connecting only once?

如果给出更完整的代码,我想我可以根据您的具体示例对其进行定制,但也许以上就足够了?另请参阅我用作“灵感”的 libs/beast/example/doc/http_examples.hpp 中的“中继 HTTP 消息”。

注意:缓冲区算法不直观。我认为这是不幸的,不应该是必要的,因此请(非常)密切关注这些样本,以了解具体是如何完成的。

【讨论】:

  • 谢谢,我会在早上复习!有很多东西可以接受,因为我对野兽/asio 很熟悉。一个简单的获取多年前的文件和this 但我正在学习。
  • 另外,感谢您看到这一点。我的意思是打电话给我的so-&gt;async_read_some(lambda);森林,树木。但这并没有解决问题。
  • 好的!我有它的工作,并有很多更好的理解。再次感谢!你的链接揭示了很多。清理完示例后,我将发布我的解决方案。仅适用于将大文件下载到本地文件的情况。
【解决方案2】:

这是我最终想出的用于带有消息泵的应用程序。我正在为我的应用程序使用 MFC。对于像我这样喜欢 asio 的人来说,这是一个必看的视频。

CppCon 2016 Michael Caisse Asynchronous IO with BoostAsio

有几种方法可以运行它。有一个打开非阻塞的定义。这适用于下载大文件并显示带有取消按钮的进度对话框的情况。要启用取消按钮,请将 bool quit 设置为 true。评论 #define NO_BLOCKING 以在消息泵等待时下载一个小文件。

我认为我使用std::thread reader_thread; 的方式在这个应用程序中是合适的。我一次不会下载多个文件。我已经开始将它插入我的应用程序,一切看起来都不错。

关于传递 lambda 的问题,@Yakk - Adam Nevraumont 非常有帮助。阅读 his answer here 可以更清楚地了解如何使用 lambda 进行捕获。

如果与 libcripto 和 libssl 的链接被删除,则此代码应该可以正常编译和运行。我正在使用 libcripto-3 这是root_certificates.hpp 的副本。我查了一下,这个版本运行正常。

完整的代码。

// Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
// Official repository: https://github.com/boostorg/beast
// Example: HTTP SSL client, asynchronous downloads

#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/error.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <fstream>
#include <memory>
#include <string>
#include <chrono>
#include <thread>

//don't need the cert in a file method or use 
    //don't need the cert in a file method or use 
#include "root_certificates.hpp"
#pragma comment(lib, "C:\\cpp\\openssl-master\\libcrypto.lib")
#pragma comment(lib, "C:\\cpp\\openssl-master\\libssl.lib")

using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp>
namespace ssl = boost::asio::ssl;       // from <boost/asio/ssl.hpp>
namespace http = boost::beast::http;    // from <boost/beast/http.hpp>

void session_fail(boost::system::error_code ec, char const* what) {
    std::cerr << what << ": " << ec.message() << "\n";
}

class session : public std::enable_shared_from_this<session>
{
public:
    enum responses {
        resp_null,
        resp_ok,
        resp_done,
    };
    using response_call_type = std::function< void(responses, std::size_t)>;
protected:
    tcp::resolver resolver_;
    ssl::stream<tcp::socket> stream_;
    boost::beast::flat_buffer buffer_; // (Must persist between reads)
    http::request<http::empty_body> req_;
    http::response<http::string_body> res_;
    boost::beast::http::request_parser<boost::beast::http::string_body> header_parser_;
    http::response_parser<http::file_body> file_parser_;
    response_call_type response_call;
    boost::system::error_code file_open_ec;
    //
    std::size_t file_pos = 0;
    std::size_t file_size = 0;

public:
    explicit session(boost::asio::io_context& ioc, ssl::context& ctx, const char* filename)
        : resolver_(ioc)
        , stream_(ioc, ctx)
    {
        file_parser_.body_limit((std::numeric_limits<std::uint64_t>::max)());
        file_parser_.get().body().open(filename, boost::beast::file_mode::write, file_open_ec);
    }
    void run(char const* host, char const* port, char const* target, int version)
    {
        std::cout << "run" << std::endl;
        if (!SSL_set_tlsext_host_name(stream_.native_handle(), host))
        {
            boost::system::error_code ec{ static_cast<int>(::ERR_get_error()), boost::asio::error::get_ssl_category() };
            std::cerr << ec.message() << "\n";
            return;
        }
        // Set up an HTTP GET request message
        req_.version(version);
        req_.method(http::verb::get);
        req_.target(target);
        req_.set(http::field::host, host);
        req_.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);

        // Look up the domain name
        resolver_.async_resolve(host, port, std::bind(
            &session::on_resolve,
            shared_from_this(),
            std::placeholders::_1,
            std::placeholders::_2));
    }
    void on_resolve(boost::system::error_code ec, tcp::resolver::results_type results)
    {
        std::cout << "on_resolve" << std::endl;
        if (ec)
            return session_fail(ec, "resolve");

        // Make the connection on the IP address we get from a lookup
        boost::asio::async_connect(stream_.next_layer(), results.begin(), results.end(), std::bind(
            &session::on_connect,
            shared_from_this(),
            std::placeholders::_1));
    }
    void on_connect(boost::system::error_code ec)
    {
        std::cout << "on_connect" << std::endl;
        if (ec)
            return session_fail(ec, "connect");

        // Perform the SSL handshake
        stream_.async_handshake(ssl::stream_base::client, std::bind(
            &session::on_handshake,
            shared_from_this(),
            std::placeholders::_1));
    }
    void on_handshake(boost::system::error_code ec)
    {
        std::cout << "on_handshake" << std::endl;
        if (ec)
            return session_fail(ec, "handshake");

        // Send the HTTP request to the remote host
        http::async_write(stream_, req_, std::bind(
            &session::on_write,
            shared_from_this(),
            std::placeholders::_1,
            std::placeholders::_2));
    }
    void on_write(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        std::cout << "on_write" << std::endl;
        if (ec)
            return session_fail(ec, "write");
        if (response_call)
            http::async_read_header(stream_, buffer_, file_parser_, std::bind(
                &session::on_startup,
                shared_from_this(),
                std::placeholders::_1,
                std::placeholders::_2));
        else
            http::async_read_header(stream_, buffer_, file_parser_, std::bind(
                &session::on_read,
                shared_from_this(),
                std::placeholders::_1,
                std::placeholders::_2));
    }
    std::size_t on_startup(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        std::cout << "on_startup: " << bytes_transferred << std::endl;
        std::string_view view((const char*)buffer_.data().data(), bytes_transferred);
        auto pos = view.find("Content-Length:");
        if (pos == std::string_view::npos)
            assert(true);//error
        file_size = std::stoi(view.substr(pos + sizeof("Content-Length:")).data());
        if (!file_size)
            assert(true);//error
        std::cout << "filesize: " << file_size << std::endl;
        http::async_read_some(stream_, buffer_, file_parser_, std::bind(
            &session::on_read_some,
            shared_from_this(),
            std::placeholders::_1,
            std::placeholders::_2));
        return buffer_.size();
    }
    std::size_t on_read_some(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        //std::cout << "on_read_some" << std::endl;
        if (ec) {
            session_fail(ec, "on_read_some");
            return 0;
        }
        file_pos += bytes_transferred;
        if (!bytes_transferred && file_pos) {
            on_shutdown(ec);
            return 0;
        }
        response_call(resp_ok, file_pos);

        //std::cout << "session::on_read_some: " << file_pos << std::endl;
        http::async_read_some(stream_, buffer_, file_parser_, std::bind(
            &session::on_read_some,
            shared_from_this(),
            std::placeholders::_1,
            std::placeholders::_2));
        return buffer_.size();
    }
    std::size_t on_read(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        file_pos += bytes_transferred;
        if (!bytes_transferred && file_pos) {
            on_shutdown(ec);
            return 0;
        }
        std::cout << "on_read: " << bytes_transferred << std::endl;
        http::async_read(stream_, buffer_, file_parser_,
            std::bind(&session::on_read,
                shared_from_this(),
                std::placeholders::_1,
                std::placeholders::_2));
        return buffer_.size();
    }
    void on_shutdown(boost::system::error_code ec)
    {
        std::cout << "on_shutdown" << std::endl;
        if (ec == boost::asio::error::eof) {
            // Rationale:
            // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
            ec.assign(0, ec.category());
        }
        if (response_call)
            response_call(resp_done, 0);
        if (ec)
            return session_fail(ec, "shutdown");
    }
    auto get_file_status() const { return file_open_ec; }
    void set_response_call(response_call_type the_call) { response_call = the_call; }
    std::size_t get_download_size() const { return file_size; }
    //std::string get_content() const { return "test"; }// return response;}
};

#define NO_BLOCKING
int main(int argc, char** argv)
{
    //in a UI app you will need to keep a persistant thread/pool;
    std::thread reader_thread;
    //for an application where this never changes, this can just be put in the session class
    auto const host = "reserveanalyst.com";
    auto const port = "443";
#ifdef NO_BLOCKING // the large file
    auto const target = "/downloads/demo.msi";
#else // the small file
    auto const target = "/server.xml";
#endif
    boost::asio::io_context ioc;
    ssl::context ctx{ ssl::context::sslv23_client };
    load_root_certificates(ctx);
    //end, put in the session class
    auto so = std::make_shared<session>(ioc, ctx, "content.txt");//may be big binary
    so->run(host, port, target, 11);//so->run(target);
    //
    session::responses glb_response = session::resp_null;
    bool test_bool = false; //stand in for 'SendMessage' values
    std::size_t buf_size = 0; //stand in for 'SendMessage' values
#ifdef NO_BLOCKING
    auto static const lambda = [&glb_response, &buf_size](session::responses response, std::size_t bytes_transferred) {
        glb_response = response;
        buf_size = bytes_transferred;
        //drive your progress bar from here in a GUI app
        //sizes = the_beast_object.get_file_size() - size;//because size is what is left
        //cDownloadProgreess.SetPos((LPARAM)(sizes * 100 / the_beast_object.get_file_size()));
    };
    so->set_response_call(lambda);
#else
    ioc.run();
    std::cout << "ioc run exited" << std::endl;
#endif

#ifdef NO_BLOCKING
//    reader_thread.swap(std::thread{ [&ioc]() { ioc.run(); } });
    std::thread new_thread{ [&ioc]() { ioc.run(); } };
    reader_thread.swap(new_thread);
#endif
    bool quit = false; //true: as if a cancel button was pushed; won't finish download
    //pseudo message pump
    for (int i = 0; ; ++i) {

        switch (glb_response) { //ad hoc as if messaged
        case session::responses::resp_ok:
            std::cout << "from sendmessage: " << buf_size << std::endl;
            break;
        case session::responses::resp_done:
            std::cout << "from sendmessage: done" << std::endl;
        }//switch
        glb_response = session::responses::resp_null;
        if (!(i % 10))
            std::cout << "in message pump, stopped: " << std::boolalpha << ioc.stopped() << std::endl;

        std::this_thread::sleep_for(std::chrono::milliseconds(200));
        if (quit && i == 10) //the cancel message
            ioc.stop();
        if (ioc.stopped())//just quit to test join.
            break;
    }
    if (reader_thread.joinable())//in the case a thread was never started
        reader_thread.join();
    std::cout << "exiting, program was quit" << std::endl;

    return EXIT_SUCCESS;
}

【讨论】:

  • 错误:无法将“std::thread&”类型的非常量左值引用绑定到“std::thread”类型的右值 reader_thread.swap(std::thread{ [&ioc]() { ioc.run(); } });
  • 嗨@q0987,谢谢。我当时使用的编译器让我摆脱了这个问题。我还更新了现有文件的获取路径。
  • 在session::on_write函数中,http::async_read_header是否在file_parser_(即demo.msi)中插入https头?我假设答案是否定的,否则二进制文件(即 demo.msi)将无效。如果https头没有写入二进制文件,http::async_read_header(stream_, buffer_, file_parser_,对传递的参数file_parser_做了什么?谢谢!
  • 从文档中,http::async_read_header 的函数用于从流中异步读取完整的消息头到 basic_parser 的实例中,basic_paser 这里是file_parser_。
  • 嗨@q0987,是的,我明白了。但显然async_read_header 只会完成它的工作,不会写入文件。我不知道我从哪里得到这个例子。当我翻阅野兽文件夹时,我没有找到类似的东西。这可能是一个很好的问题。我得做更多的挖掘工作。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-08-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-08-09
相关资源
最近更新 更多