【问题标题】:boost::asio with named pipes; parent process can't read from pipe via async_read_some带有命名管道的 boost::asio;父进程无法通过 async_read_some 从管道中读取
【发布时间】:2021-05-16 18:18:39
【问题描述】:

问题陈述

我有一个 Windows 服务器/父进程,它创建两个命名管道,一个入站和一个出站。然后它启动一个客户端/子进程并将这些管道名称提供给连接到它们的子进程。

子进程能够成功地异步读写管道。父进程能够写入出站管道,但无法从入站管道读取(我的代码在 2.5 秒后超时)。

预期输出

我对整个 shebang 的期望是:

Writing {Hello pipes!} asynchronously to {\\.\pipe\processWrapperOut}
Reading asynchronously from {\\.\pipe\processWrapperIn}
Wrote {12} bytes to {\\.\pipe\processWrapperOut}
Started read + write
Read {21} bytes, msg {Goodbye cruel world! } from {\\.\pipe\processWrapperIn}
Finished waiting for write
Wrote to pipe!
Finished waiting for read
Received {Goodbye cruel world! }
Child stdout: {
        Connecting to {\\.\pipe\processWrapperOut} and {\\.\pipe\processWrapperIn}
        Wrote {21} bytes asynchronously to {\\.\pipe\processWrapperIn}
        Read {12} bytes asynchronously from {\\.\pipe\processWrapperOut}
        Write handler completed
        Read handler completed
        Got {Hello pipes!}
        Exiting


Child stderr: {

实际输出

我得到的输出是:

Writing {Hello pipes!} asynchronously to {\\.\pipe\processWrapperOut}
Reading asynchronously from {\\.\pipe\processWrapperIn}
Wrote {12} bytes to {\\.\pipe\processWrapperOut}
Started read + write
Finished waiting for write
Wrote to pipe!
Finished waiting for read
Got exception {Timed out trying to read from pipe}
Child stdout: {
        Connecting to {\\.\pipe\processWrapperOut} and {\\.\pipe\processWrapperIn}
        Wrote {21} bytes asynchronously to {\\.\pipe\processWrapperIn}
        Read {12} bytes asynchronously from {\\.\pipe\processWrapperOut}
        Write handler completed
        Read handler completed
        Got {Hello pipes!}
        Exiting


Child stderr: {

完整代码

下面是完整的代码。我试图将其归结为尽可能少的代码,甚至从我没有看到错误的情况中删除错误检查。

父进程

/// AsyncPipeServer.cpp
#define WIN32_LEAN_AND_MEAN  // Exclude rarely-used stuff from Windows headers
#include <WinSDKVer.h>
#define _WIN32_WINNT 0x0601
#include <windows.h>

#include <future>
#include <iostream>
#include <sstream>
#include <string>

#include <SDKDDKVer.h>
#include <tchar.h>

#pragma warning(push, 0)
#include <boost/winapi/handle_info.hpp>
//
#include <boost/asio.hpp>
#include <boost/asio/basic_streambuf.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/windows/stream_handle.hpp>
#include <boost/optional/optional.hpp>
#include <boost/process.hpp>
#include <boost/process/child.hpp>
#include <boost/process/environment.hpp>
#include <boost/process/extend.hpp>
#include <boost/process/group.hpp>
#include <boost/process/io.hpp>
#pragma warning(pop)

boost::asio::windows::stream_handle CreateStreamHandle(
    const std::string& pipeName,
    DWORD openFlags,
    OVERLAPPED& overlapped,
    boost::asio::io_context& ioContext);
void SetToDieWithParent(HANDLE& localHandle);
std::string GetErrorAsString(const DWORD errorMessageID);
std::string ChangeNewlines(std::string input);

class ProcessWrapper
{
public:
    explicit ProcessWrapper(const std::string& cmdToLaunch)
        : m_cmdToLaunch(cmdToLaunch)
        , m_inPipeName{ "\\\\.\\pipe\\processWrapperIn" }
        , m_outPipeName{ "\\\\.\\pipe\\processWrapperOut" }
    {
    }

    ~ProcessWrapper()
    {
        try
        {
            m_ioContext.stop();
            m_ioThread.join();
        }
        catch (...)
        {
        }
    }

    void Start()
    {
        m_inPipe = CreateStreamHandle(m_inPipeName, PIPE_ACCESS_INBOUND, m_inOverlapped, m_ioContext);
        assert(m_inPipe->is_open());

        m_outPipe = CreateStreamHandle(m_outPipeName, PIPE_ACCESS_OUTBOUND, m_outOverlapped, m_ioContext);
        assert(m_outPipe->is_open());

        const auto cmd{ m_cmdToLaunch + " " + m_outPipeName + " " + m_inPipeName };

        HANDLE localHandle = nullptr;
        SetToDieWithParent(localHandle);
        m_group = std::make_unique<boost::process::group>(localHandle);
        m_child = std::make_unique<boost::process::child>(
            cmd,
            boost::process::std_in.close(),
            boost::process::std_out > m_stdOutput,  //so it can be written without anything
            boost::process::std_err > m_stdError,
            m_errorCode);
        if (!m_errorCode)
        {
            m_group->add(*m_child.get(), m_errorCode);
        }

        m_ioThread = std::thread{ [ctx = &m_ioContext, this] {
            while (this->IsRunning())
            {
                ctx->run();
            }
        } };
    }

    bool IsRunning()
    {
        if (!m_child)
            return false;
        return m_child->running(m_errorCode);
    }

    std::string SyncGetStdOut()
    {
        return GetPipeStreamImpl(m_stdOutput, m_output);
    }

    std::string SyncGetStdError()
    {
        return GetPipeStreamImpl(m_stdError, m_error);
    }

    std::future<bool> WriteToPipe(const std::string& msg)
    {
        auto promise{ std::make_shared<std::promise<bool>>() };
        if (!m_outPipe)
        {
            promise->set_value(false);
            return promise->get_future();
        }

        assert(m_outPipe->is_open());
        {
            std::stringstream ss;
            ss << "Writing {" << msg << "} asynchronously to {" << m_outPipeName << "}\n";
            std::cout << ss.str();
        }

        const auto buffPtr{ std::make_shared<boost::asio::const_buffer>(msg.c_str(), msg.length()) };

        m_outPipe->async_write_some(
            *buffPtr,
            [pipeName = m_outPipeName, promise, buffPtr](
                const boost::system::error_code& ec, const std::size_t bytes_written) {
                if (ec)
                {
                    std::stringstream ss{};
                    ss << "Had error writing {" << ec.message() << "} to {" << pipeName << "}";
                    std::cerr << ss.str() << '\n';
                    promise->set_exception(std::make_exception_ptr(std::runtime_error{ ss.str() }));
                }
                else
                {
                    {
                        std::stringstream ss;
                        ss << "Wrote {" << bytes_written << "} bytes to {" << pipeName << "}\n";
                        std::cout << ss.str();
                    }
                    promise->set_value(true);
                }
            });
        return promise->get_future();
    }

    std::future<boost::optional<std::string>> ReadFromPipe()
    {
        auto promise{ std::make_shared<std::promise<boost::optional<std::string>>>() };
        if (!m_inPipe)
        {
            promise->set_value({});
            return promise->get_future();
        }
        assert(m_inPipe->is_open());
        {
            std::stringstream ss;
            ss << "Reading asynchronously from {" << m_inPipeName << "}\n";
            std::cout << ss.str();
        }
        constexpr auto size{ 256 };
        auto buff{ std::make_shared<boost::asio::streambuf>() };
        m_inPipe->async_read_some(
            buff->prepare(size),
            [pipeName = m_inPipeName, promise, buff, size](
                const boost::system::error_code& ec, const std::size_t bytes_transferred) {
                try
                {
                    if (ec)
                    {
                        std::stringstream ss{};
                        ss << "Had error reading {" << ec.message() << "} from {" << pipeName << "}";
                        std::cerr << ss.str() << '\n';
                        promise->set_exception(std::make_exception_ptr(std::runtime_error{ ss.str() }));
                    }
                    else
                    {
                        buff->commit(bytes_transferred);
                        std::istream is{ buff.get() };
                        std::string retval{};
                        retval.resize(size);
                        is.read(&retval.front(), size);
                        retval.resize(is.gcount());
                        {
                            std::stringstream ss;
                            ss << "Read {" << bytes_transferred << "} bytes, msg {" << retval << "} from {" << pipeName
                               << "}\n";
                            std::cout << ss.str();
                        }
                        promise->set_value(retval);
                    }
                }
                catch (...)
                {
                    promise->set_exception(std::current_exception());
                }
            });
        return promise->get_future();
    }

private:
    std::string m_cmdToLaunch;

    boost::asio::io_context m_ioContext;
    std::thread m_ioThread;

    std::string m_inPipeName;
    OVERLAPPED m_inOverlapped{};
    boost::optional<boost::asio::windows::stream_handle> m_inPipe;
    std::string m_outPipeName;
    OVERLAPPED m_outOverlapped{};
    boost::optional<boost::asio::windows::stream_handle> m_outPipe;

    boost::process::ipstream m_stdOutput;
    std::string m_output;
    boost::process::ipstream m_stdError;
    std::string m_error;

    std::unique_ptr<boost::process::child> m_child;
    std::unique_ptr<boost::process::group> m_group;
    std::error_code m_errorCode;

    std::string GetPipeStreamImpl(boost::process::ipstream& stream, std::string& errorOrOutput)
    {
        if (!m_child)
        {
            return errorOrOutput;
        }
        m_child->wait();
        std::string line{};
        while (std::getline(stream, line))
        {
            errorOrOutput += line;
        }
        m_stdOutput.pipe().close();
        return errorOrOutput;
    }
};

int main()
{
    ProcessWrapper process("AsyncPipeMain.exe");
    process.Start();
    std::this_thread::sleep_for(std::chrono::milliseconds{ 50 });
    try
    {
        auto write{ process.WriteToPipe("Hello pipes!") };
        auto msg{ process.ReadFromPipe() };
        std::cout << "Started read + write\n";

        const auto writeReady{ write.wait_for(std::chrono::milliseconds{ 250 }) };
        std::cout << "Finished waiting for write\n";
        if (writeReady != std::future_status::ready)
        {
            throw std::runtime_error{ "Timed out trying to write to pipe" };
        }
        const auto writeVal{ write.get() };
        std::cout << (writeVal ? "Wrote to pipe!\n" : "Did not write to pipe!\n");

        const auto readReady{ msg.wait_for(std::chrono::milliseconds{ 250 }) };
        std::cout << "Finished waiting for read\n";
        if (readReady != std::future_status::ready)
        {
            throw std::runtime_error{ "Timed out trying to read from pipe" };
        }
        const auto msgVal{ msg.get() };
        if (msgVal)
        {
            std::cout << "Received {" << msgVal.value() << "}\n";
            //assert("Goodbye cruel world!" == msgVal.value());
        }
        else
        {
            std::cout << "Didn't receive a msg\n";
        }
    }
    catch (const std::exception& e)
    {
        std::cout << "Got exception {" << e.what() << "}\n";
    }

    std::cout << "Child stdout: {\n";
    std::cout << '\t' << ChangeNewlines(process.SyncGetStdOut()) << "\n}\n";
    std::cout << "Child stderr: {\n";
    std::cerr << '\t' << ChangeNewlines(process.SyncGetStdError()) << "\n}\n";
}

boost::asio::windows::stream_handle CreateStreamHandle(
    const std::string& pipeName,
    DWORD openFlags,
    OVERLAPPED& overlapped,
    boost::asio::io_context& ioContext)
{
    auto* const inHandle{ CreateNamedPipeA(
        pipeName.c_str(), openFlags | FILE_FLAG_OVERLAPPED, 0, PIPE_UNLIMITED_INSTANCES, 8192, 8192, 0, nullptr) };

    if (INVALID_HANDLE_VALUE == inHandle)
    {
        throw std::runtime_error{ "could not open pipe" };
    }

    const auto inConnected{ ConnectNamedPipe(inHandle, &overlapped) };

    const auto lastErr{ ::GetLastError() };
    if (0 != inConnected || ERROR_IO_PENDING != lastErr && ERROR_PIPE_CONNECTED != lastErr)
    {
        throw std::runtime_error{ "could not connect to pipe" };
    }

    return { ioContext, inHandle };
}

void SetToDieWithParent(HANDLE& localHandle)
{
    assert(!localHandle);
    localHandle = CreateJobObject(nullptr, nullptr);
    JOBOBJECT_EXTENDED_LIMIT_INFORMATION jobInfo;
    memset(&jobInfo, 0, sizeof(jobInfo));
    jobInfo.BasicLimitInformation.LimitFlags |= JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
    if (!SetInformationJobObject(localHandle, JobObjectExtendedLimitInformation, &jobInfo, sizeof(jobInfo)))
    {
        throw std::runtime_error("Internal error: Could not set up job object to handle");
    }
}

//https://stackoverflow.com/a/17387176/2025214
//Returns the last Win32 error, in string format. Returns an empty string if there is no error.
std::string GetErrorAsString(const DWORD errorMessageID)
{
    if (errorMessageID == 0)
    {
        return std::string();  //No error message has been recorded
    }
    LPSTR messageBuffer = nullptr;
    size_t size = FormatMessageA(
        FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
        NULL,
        errorMessageID,
        MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
        reinterpret_cast<LPSTR>(&messageBuffer),
        0,
        NULL);
    std::string message(messageBuffer, size);
    LocalFree(messageBuffer);
    return message;
}

//throwaway function to make output legible
std::string ChangeNewlines(std::string input)
{
    for (auto newline{ input.find("\r") }; newline != input.npos; newline = input.find("\r", newline + 1))
    {
        input[newline] = '\n';
        input.insert(newline + 1, "\t");
    }
    return input;
}

子进程

#define WIN32_LEAN_AND_MEAN  // Exclude rarely-used stuff from Windows headers
#include <WinSDKVer.h>
#define _WIN32_WINNT 0x0601
#include <windows.h>

#include <iostream>
#include <sstream>

#include <boost/asio/windows/stream_handle.hpp>
#include <boost/utility/string_view.hpp>

std::string GetErrorAsString(DWORD errorMessageID);
boost::asio::windows::stream_handle CreateStreamHandle(
    boost::string_view pipeName,
    DWORD accessFlag,
    boost::asio::io_context& ioContext);

int main(const int argc, const char* const* const argv)
{
    std::cout << "Connecting to {" << argv[1] << "} and {" << argv[2] << "}\n";
    const boost::string_view inPipeName{ argv[1] };
    const boost::string_view outPipeName{ argv[2] };
    boost::asio::io_context io{};

    try
    {
        auto in{ CreateStreamHandle(inPipeName, GENERIC_READ, io) };
        auto out{ CreateStreamHandle(outPipeName, GENERIC_WRITE, io) };

        boost::system::error_code inError{};
        std::array<char, 256> buffer{ '\0' };
        std::atomic<size_t> bytesWritten{};
        boost::asio::mutable_buffer buff{ buffer.data(), buffer.size() };
        std::atomic<bool> finishedReading{ false };
        in.async_read_some(
            buff,
            [inPipeName, &finishedReading, &inError, &bytesWritten](
                const boost::system::error_code& ec, const std::size_t bytes_transferred) {
                std::cout << "Read {" << bytes_transferred << "} bytes asynchronously from {" << inPipeName << "}\n";
                bytesWritten.store(bytes_transferred);
                finishedReading.store(true);
            });

        boost::system::error_code outError{};
        std::atomic<bool> finishedWriting{ false };
        constexpr auto outMsg{ "Goodbye cruel world!" };
        const boost::asio::const_buffer outBuff{ outMsg, 21 };
        out.async_write_some(
            outBuff,
            [outPipeName, &finishedWriting, &outError](
                const boost::system::error_code& ec, const std::size_t bytes_written) {
                std::cout << "Wrote {" << bytes_written << "} bytes asynchronously to {" << outPipeName << "}\n";
                finishedWriting.store(true);
            });

        const auto start{ std::chrono::system_clock::now() };
        const auto end{ start + std::chrono::seconds{ 5 } };
        auto current{ start };
        const auto step{ (end - start) / 100 };

        std::thread ioThread{ [end, &io, &finishedReading, &finishedWriting] {
            while (std::chrono::system_clock::now() < end && (!finishedReading.load() || !finishedWriting.load()))
            {
                io.run();
            }
        } };

        while (!finishedReading.load() || !finishedWriting.load())
        {
            const auto now{ std::chrono::system_clock::now() };
            if (end < now)
            {
                std::cout << "\n";
                break;
            }
            if ((current + step) < now)
            {
                current = now;
                std::cout << " .";
            }
        }

        if (finishedWriting.load())
        {
            std::cout << "Write handler completed\n";
        }

        if (finishedReading.load())
        {
            std::cout << "Read handler completed\n";
            std::cout << "Got {";
            std::cout.write(buffer.data(), bytesWritten.load());
            std::cout << "}\n";
        }

        io.stop();
        ioThread.join();
        std::cout << "Exiting\n";

        if (finishedReading.load() && !inError && finishedWriting.load() && !outError)
        {
            return 0;
        }
        return -1;
    }
    catch (const std::exception& e)
    {
        std::cerr << "Caught exception {" << e.what() << "}\n";
        return -1;
    }
}

boost::asio::windows::stream_handle CreateStreamHandle(
    const boost::string_view pipeName,
    const DWORD accessFlag,
    boost::asio::io_context& ioContext)
{
    const auto inHandle{
        CreateFileA(
            pipeName.data(),       // pipe name
            accessFlag,            // access
            0,                     // no sharing
            NULL,                  // default security attributes
            OPEN_EXISTING,         // opens existing pipe
            FILE_FLAG_OVERLAPPED,  // async
            NULL)                  // no template file
    };

    return { ioContext, inHandle };
}

//note that GetErrorAsString is also needed in this main

【问题讨论】:

  • 为什么要结合十几种不同的方法?具体来说,为什么要同时使用异步流和同步流?为什么要使用 Boost 的 async_pipe 以及您自己的命名管道? Boost Process 的管道有什么没有做的吗?或者从另一个角度来看:你想要实现什么,Boost 是否添加了什么?
  • 嘿嘿嘿,感谢您的光临。此问题中的所有代码都使用boost::asio::windows::stream_handle 并调用async_*_some API。我理解这种混淆,因为我包含了 async_pipe.hpp 标头(将对其进行编辑),但您会注意到我实际上并没有在代码中使用它们(我在第一次尝试时使用它们代替 stream_handle,但是他们根本没有工作)。我想要实现的是在 Windows(最终是 Unix)上异步使用命名管道,而无需自己编写低级代码。 boost::asio 是我尝试的第一件事。
  • 我怀疑async_read_some 正在吞咽错误。当我在等待读取超时时尝试从另一个线程窥视管道时,出现“管道关闭”错误。但是async_read_some 声称它将显示从 API 获得的任何错误。为什么它不告诉我管道已关闭?在假设同步 API 可能会给我错误的假设下,我在我的“答案”中编写了代码,然后结果发现代码有效,这并没有让我更接近于找出答案为什么异步 API 不起作用。
  • 我认为 async_pipe 应该可以正常工作(无论如何它在内部使用stream_handle,并且省去了设置管道的所有麻烦)。如果没有,那很烦人——这可能是直截了当的事情。我没有windows环境,所以我认为我无法在合理的时间范围内查看它
  • 另外,async_read_some 没有吞咽错误。如果是这样,那就是一个错误(或者分配给stream_handle 的句柄不符合某些记录标准?)

标签: c++ windows boost boost-asio


【解决方案1】:

可行的替代解决方案

我发现read_some 的非异步版本可以使用此代码:

    std::future<boost::optional<std::string>> ReadFromPipe()
    {
        auto promise{ std::make_shared<std::promise<boost::optional<std::string>>>() };
        if (!m_inPipe)
        {
            promise->set_value({});
            return promise->get_future();
        }
        assert(m_inPipe->is_open());
        {
            std::stringstream ss;
            ss << "Reading asynchronously from {" << m_inPipeName << "}\n";
            std::cout << ss.str();
        }
        constexpr auto size{ 256 };
        auto buff{ std::make_shared<boost::asio::streambuf>() };
        m_inPipe->async_read_some(
            buff->prepare(size),
            [pipeName = m_inPipeName, promise, buff, size](
                const boost::system::error_code& ec, const std::size_t bytes_transferred) {
                try
                {
                    if (ec)
                    {
                        std::stringstream ss{};
                        ss << "Had error reading {" << ec.message() << "} from {" << pipeName << "}";
                        std::cerr << ss.str() << '\n';
                        promise->set_exception(std::make_exception_ptr(std::runtime_error{ ss.str() }));
                    }
                    else
                    {
                        buff->commit(bytes_transferred);
                        std::istream is{ buff.get() };
                        std::string retval{};
                        retval.resize(size);
                        is.read(&retval.front(), size);
                        retval.resize(is.gcount());
                        {
                            std::stringstream ss;
                            ss << "Read {" << bytes_transferred << "} bytes, msg {" << retval << "} from {" << pipeName
                               << "}\n";
                            std::cout << ss.str();
                        }
                        promise->set_value(retval);
                    }
                }
                catch (...)
                {
                    promise->set_exception(std::current_exception());
                }
            });
        return promise->get_future();
    }

但我更喜欢使用async_read_some 方法。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-03
    • 2016-11-14
    • 1970-01-01
    • 1970-01-01
    • 2014-08-03
    相关资源
    最近更新 更多