【发布时间】:2021-05-16 18:18:39
【问题描述】:
问题陈述
我有一个 Windows 服务器/父进程,它创建两个命名管道,一个入站和一个出站。然后它启动一个客户端/子进程并将这些管道名称提供给连接到它们的子进程。
子进程能够成功地异步读写管道。父进程能够写入出站管道,但无法从入站管道读取(我的代码在 2.5 秒后超时)。
预期输出
我对整个 shebang 的期望是:
Writing {Hello pipes!} asynchronously to {\\.\pipe\processWrapperOut}
Reading asynchronously from {\\.\pipe\processWrapperIn}
Wrote {12} bytes to {\\.\pipe\processWrapperOut}
Started read + write
Read {21} bytes, msg {Goodbye cruel world! } from {\\.\pipe\processWrapperIn}
Finished waiting for write
Wrote to pipe!
Finished waiting for read
Received {Goodbye cruel world! }
Child stdout: {
Connecting to {\\.\pipe\processWrapperOut} and {\\.\pipe\processWrapperIn}
Wrote {21} bytes asynchronously to {\\.\pipe\processWrapperIn}
Read {12} bytes asynchronously from {\\.\pipe\processWrapperOut}
Write handler completed
Read handler completed
Got {Hello pipes!}
Exiting
Child stderr: {
实际输出
我得到的输出是:
Writing {Hello pipes!} asynchronously to {\\.\pipe\processWrapperOut}
Reading asynchronously from {\\.\pipe\processWrapperIn}
Wrote {12} bytes to {\\.\pipe\processWrapperOut}
Started read + write
Finished waiting for write
Wrote to pipe!
Finished waiting for read
Got exception {Timed out trying to read from pipe}
Child stdout: {
Connecting to {\\.\pipe\processWrapperOut} and {\\.\pipe\processWrapperIn}
Wrote {21} bytes asynchronously to {\\.\pipe\processWrapperIn}
Read {12} bytes asynchronously from {\\.\pipe\processWrapperOut}
Write handler completed
Read handler completed
Got {Hello pipes!}
Exiting
Child stderr: {
完整代码
下面是完整的代码。我试图将其归结为尽可能少的代码,甚至从我没有看到错误的情况中删除错误检查。
父进程
/// AsyncPipeServer.cpp
#define WIN32_LEAN_AND_MEAN // Exclude rarely-used stuff from Windows headers
#include <WinSDKVer.h>
#define _WIN32_WINNT 0x0601
#include <windows.h>
#include <future>
#include <iostream>
#include <sstream>
#include <string>
#include <SDKDDKVer.h>
#include <tchar.h>
#pragma warning(push, 0)
#include <boost/winapi/handle_info.hpp>
//
#include <boost/asio.hpp>
#include <boost/asio/basic_streambuf.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/windows/stream_handle.hpp>
#include <boost/optional/optional.hpp>
#include <boost/process.hpp>
#include <boost/process/child.hpp>
#include <boost/process/environment.hpp>
#include <boost/process/extend.hpp>
#include <boost/process/group.hpp>
#include <boost/process/io.hpp>
#pragma warning(pop)
boost::asio::windows::stream_handle CreateStreamHandle(
const std::string& pipeName,
DWORD openFlags,
OVERLAPPED& overlapped,
boost::asio::io_context& ioContext);
void SetToDieWithParent(HANDLE& localHandle);
std::string GetErrorAsString(const DWORD errorMessageID);
std::string ChangeNewlines(std::string input);
class ProcessWrapper
{
public:
explicit ProcessWrapper(const std::string& cmdToLaunch)
: m_cmdToLaunch(cmdToLaunch)
, m_inPipeName{ "\\\\.\\pipe\\processWrapperIn" }
, m_outPipeName{ "\\\\.\\pipe\\processWrapperOut" }
{
}
~ProcessWrapper()
{
try
{
m_ioContext.stop();
m_ioThread.join();
}
catch (...)
{
}
}
void Start()
{
m_inPipe = CreateStreamHandle(m_inPipeName, PIPE_ACCESS_INBOUND, m_inOverlapped, m_ioContext);
assert(m_inPipe->is_open());
m_outPipe = CreateStreamHandle(m_outPipeName, PIPE_ACCESS_OUTBOUND, m_outOverlapped, m_ioContext);
assert(m_outPipe->is_open());
const auto cmd{ m_cmdToLaunch + " " + m_outPipeName + " " + m_inPipeName };
HANDLE localHandle = nullptr;
SetToDieWithParent(localHandle);
m_group = std::make_unique<boost::process::group>(localHandle);
m_child = std::make_unique<boost::process::child>(
cmd,
boost::process::std_in.close(),
boost::process::std_out > m_stdOutput, //so it can be written without anything
boost::process::std_err > m_stdError,
m_errorCode);
if (!m_errorCode)
{
m_group->add(*m_child.get(), m_errorCode);
}
m_ioThread = std::thread{ [ctx = &m_ioContext, this] {
while (this->IsRunning())
{
ctx->run();
}
} };
}
bool IsRunning()
{
if (!m_child)
return false;
return m_child->running(m_errorCode);
}
std::string SyncGetStdOut()
{
return GetPipeStreamImpl(m_stdOutput, m_output);
}
std::string SyncGetStdError()
{
return GetPipeStreamImpl(m_stdError, m_error);
}
std::future<bool> WriteToPipe(const std::string& msg)
{
auto promise{ std::make_shared<std::promise<bool>>() };
if (!m_outPipe)
{
promise->set_value(false);
return promise->get_future();
}
assert(m_outPipe->is_open());
{
std::stringstream ss;
ss << "Writing {" << msg << "} asynchronously to {" << m_outPipeName << "}\n";
std::cout << ss.str();
}
const auto buffPtr{ std::make_shared<boost::asio::const_buffer>(msg.c_str(), msg.length()) };
m_outPipe->async_write_some(
*buffPtr,
[pipeName = m_outPipeName, promise, buffPtr](
const boost::system::error_code& ec, const std::size_t bytes_written) {
if (ec)
{
std::stringstream ss{};
ss << "Had error writing {" << ec.message() << "} to {" << pipeName << "}";
std::cerr << ss.str() << '\n';
promise->set_exception(std::make_exception_ptr(std::runtime_error{ ss.str() }));
}
else
{
{
std::stringstream ss;
ss << "Wrote {" << bytes_written << "} bytes to {" << pipeName << "}\n";
std::cout << ss.str();
}
promise->set_value(true);
}
});
return promise->get_future();
}
std::future<boost::optional<std::string>> ReadFromPipe()
{
auto promise{ std::make_shared<std::promise<boost::optional<std::string>>>() };
if (!m_inPipe)
{
promise->set_value({});
return promise->get_future();
}
assert(m_inPipe->is_open());
{
std::stringstream ss;
ss << "Reading asynchronously from {" << m_inPipeName << "}\n";
std::cout << ss.str();
}
constexpr auto size{ 256 };
auto buff{ std::make_shared<boost::asio::streambuf>() };
m_inPipe->async_read_some(
buff->prepare(size),
[pipeName = m_inPipeName, promise, buff, size](
const boost::system::error_code& ec, const std::size_t bytes_transferred) {
try
{
if (ec)
{
std::stringstream ss{};
ss << "Had error reading {" << ec.message() << "} from {" << pipeName << "}";
std::cerr << ss.str() << '\n';
promise->set_exception(std::make_exception_ptr(std::runtime_error{ ss.str() }));
}
else
{
buff->commit(bytes_transferred);
std::istream is{ buff.get() };
std::string retval{};
retval.resize(size);
is.read(&retval.front(), size);
retval.resize(is.gcount());
{
std::stringstream ss;
ss << "Read {" << bytes_transferred << "} bytes, msg {" << retval << "} from {" << pipeName
<< "}\n";
std::cout << ss.str();
}
promise->set_value(retval);
}
}
catch (...)
{
promise->set_exception(std::current_exception());
}
});
return promise->get_future();
}
private:
std::string m_cmdToLaunch;
boost::asio::io_context m_ioContext;
std::thread m_ioThread;
std::string m_inPipeName;
OVERLAPPED m_inOverlapped{};
boost::optional<boost::asio::windows::stream_handle> m_inPipe;
std::string m_outPipeName;
OVERLAPPED m_outOverlapped{};
boost::optional<boost::asio::windows::stream_handle> m_outPipe;
boost::process::ipstream m_stdOutput;
std::string m_output;
boost::process::ipstream m_stdError;
std::string m_error;
std::unique_ptr<boost::process::child> m_child;
std::unique_ptr<boost::process::group> m_group;
std::error_code m_errorCode;
std::string GetPipeStreamImpl(boost::process::ipstream& stream, std::string& errorOrOutput)
{
if (!m_child)
{
return errorOrOutput;
}
m_child->wait();
std::string line{};
while (std::getline(stream, line))
{
errorOrOutput += line;
}
m_stdOutput.pipe().close();
return errorOrOutput;
}
};
int main()
{
ProcessWrapper process("AsyncPipeMain.exe");
process.Start();
std::this_thread::sleep_for(std::chrono::milliseconds{ 50 });
try
{
auto write{ process.WriteToPipe("Hello pipes!") };
auto msg{ process.ReadFromPipe() };
std::cout << "Started read + write\n";
const auto writeReady{ write.wait_for(std::chrono::milliseconds{ 250 }) };
std::cout << "Finished waiting for write\n";
if (writeReady != std::future_status::ready)
{
throw std::runtime_error{ "Timed out trying to write to pipe" };
}
const auto writeVal{ write.get() };
std::cout << (writeVal ? "Wrote to pipe!\n" : "Did not write to pipe!\n");
const auto readReady{ msg.wait_for(std::chrono::milliseconds{ 250 }) };
std::cout << "Finished waiting for read\n";
if (readReady != std::future_status::ready)
{
throw std::runtime_error{ "Timed out trying to read from pipe" };
}
const auto msgVal{ msg.get() };
if (msgVal)
{
std::cout << "Received {" << msgVal.value() << "}\n";
//assert("Goodbye cruel world!" == msgVal.value());
}
else
{
std::cout << "Didn't receive a msg\n";
}
}
catch (const std::exception& e)
{
std::cout << "Got exception {" << e.what() << "}\n";
}
std::cout << "Child stdout: {\n";
std::cout << '\t' << ChangeNewlines(process.SyncGetStdOut()) << "\n}\n";
std::cout << "Child stderr: {\n";
std::cerr << '\t' << ChangeNewlines(process.SyncGetStdError()) << "\n}\n";
}
boost::asio::windows::stream_handle CreateStreamHandle(
const std::string& pipeName,
DWORD openFlags,
OVERLAPPED& overlapped,
boost::asio::io_context& ioContext)
{
auto* const inHandle{ CreateNamedPipeA(
pipeName.c_str(), openFlags | FILE_FLAG_OVERLAPPED, 0, PIPE_UNLIMITED_INSTANCES, 8192, 8192, 0, nullptr) };
if (INVALID_HANDLE_VALUE == inHandle)
{
throw std::runtime_error{ "could not open pipe" };
}
const auto inConnected{ ConnectNamedPipe(inHandle, &overlapped) };
const auto lastErr{ ::GetLastError() };
if (0 != inConnected || ERROR_IO_PENDING != lastErr && ERROR_PIPE_CONNECTED != lastErr)
{
throw std::runtime_error{ "could not connect to pipe" };
}
return { ioContext, inHandle };
}
void SetToDieWithParent(HANDLE& localHandle)
{
assert(!localHandle);
localHandle = CreateJobObject(nullptr, nullptr);
JOBOBJECT_EXTENDED_LIMIT_INFORMATION jobInfo;
memset(&jobInfo, 0, sizeof(jobInfo));
jobInfo.BasicLimitInformation.LimitFlags |= JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
if (!SetInformationJobObject(localHandle, JobObjectExtendedLimitInformation, &jobInfo, sizeof(jobInfo)))
{
throw std::runtime_error("Internal error: Could not set up job object to handle");
}
}
//https://stackoverflow.com/a/17387176/2025214
//Returns the last Win32 error, in string format. Returns an empty string if there is no error.
std::string GetErrorAsString(const DWORD errorMessageID)
{
if (errorMessageID == 0)
{
return std::string(); //No error message has been recorded
}
LPSTR messageBuffer = nullptr;
size_t size = FormatMessageA(
FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
NULL,
errorMessageID,
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
reinterpret_cast<LPSTR>(&messageBuffer),
0,
NULL);
std::string message(messageBuffer, size);
LocalFree(messageBuffer);
return message;
}
//throwaway function to make output legible
std::string ChangeNewlines(std::string input)
{
for (auto newline{ input.find("\r") }; newline != input.npos; newline = input.find("\r", newline + 1))
{
input[newline] = '\n';
input.insert(newline + 1, "\t");
}
return input;
}
子进程
#define WIN32_LEAN_AND_MEAN // Exclude rarely-used stuff from Windows headers
#include <WinSDKVer.h>
#define _WIN32_WINNT 0x0601
#include <windows.h>
#include <iostream>
#include <sstream>
#include <boost/asio/windows/stream_handle.hpp>
#include <boost/utility/string_view.hpp>
std::string GetErrorAsString(DWORD errorMessageID);
boost::asio::windows::stream_handle CreateStreamHandle(
boost::string_view pipeName,
DWORD accessFlag,
boost::asio::io_context& ioContext);
int main(const int argc, const char* const* const argv)
{
std::cout << "Connecting to {" << argv[1] << "} and {" << argv[2] << "}\n";
const boost::string_view inPipeName{ argv[1] };
const boost::string_view outPipeName{ argv[2] };
boost::asio::io_context io{};
try
{
auto in{ CreateStreamHandle(inPipeName, GENERIC_READ, io) };
auto out{ CreateStreamHandle(outPipeName, GENERIC_WRITE, io) };
boost::system::error_code inError{};
std::array<char, 256> buffer{ '\0' };
std::atomic<size_t> bytesWritten{};
boost::asio::mutable_buffer buff{ buffer.data(), buffer.size() };
std::atomic<bool> finishedReading{ false };
in.async_read_some(
buff,
[inPipeName, &finishedReading, &inError, &bytesWritten](
const boost::system::error_code& ec, const std::size_t bytes_transferred) {
std::cout << "Read {" << bytes_transferred << "} bytes asynchronously from {" << inPipeName << "}\n";
bytesWritten.store(bytes_transferred);
finishedReading.store(true);
});
boost::system::error_code outError{};
std::atomic<bool> finishedWriting{ false };
constexpr auto outMsg{ "Goodbye cruel world!" };
const boost::asio::const_buffer outBuff{ outMsg, 21 };
out.async_write_some(
outBuff,
[outPipeName, &finishedWriting, &outError](
const boost::system::error_code& ec, const std::size_t bytes_written) {
std::cout << "Wrote {" << bytes_written << "} bytes asynchronously to {" << outPipeName << "}\n";
finishedWriting.store(true);
});
const auto start{ std::chrono::system_clock::now() };
const auto end{ start + std::chrono::seconds{ 5 } };
auto current{ start };
const auto step{ (end - start) / 100 };
std::thread ioThread{ [end, &io, &finishedReading, &finishedWriting] {
while (std::chrono::system_clock::now() < end && (!finishedReading.load() || !finishedWriting.load()))
{
io.run();
}
} };
while (!finishedReading.load() || !finishedWriting.load())
{
const auto now{ std::chrono::system_clock::now() };
if (end < now)
{
std::cout << "\n";
break;
}
if ((current + step) < now)
{
current = now;
std::cout << " .";
}
}
if (finishedWriting.load())
{
std::cout << "Write handler completed\n";
}
if (finishedReading.load())
{
std::cout << "Read handler completed\n";
std::cout << "Got {";
std::cout.write(buffer.data(), bytesWritten.load());
std::cout << "}\n";
}
io.stop();
ioThread.join();
std::cout << "Exiting\n";
if (finishedReading.load() && !inError && finishedWriting.load() && !outError)
{
return 0;
}
return -1;
}
catch (const std::exception& e)
{
std::cerr << "Caught exception {" << e.what() << "}\n";
return -1;
}
}
boost::asio::windows::stream_handle CreateStreamHandle(
const boost::string_view pipeName,
const DWORD accessFlag,
boost::asio::io_context& ioContext)
{
const auto inHandle{
CreateFileA(
pipeName.data(), // pipe name
accessFlag, // access
0, // no sharing
NULL, // default security attributes
OPEN_EXISTING, // opens existing pipe
FILE_FLAG_OVERLAPPED, // async
NULL) // no template file
};
return { ioContext, inHandle };
}
//note that GetErrorAsString is also needed in this main
【问题讨论】:
-
为什么要结合十几种不同的方法?具体来说,为什么要同时使用异步流和同步流?为什么要使用 Boost 的 async_pipe 以及您自己的命名管道? Boost Process 的管道有什么没有做的吗?或者从另一个角度来看:你想要实现什么,Boost 是否添加了什么?
-
嘿嘿嘿,感谢您的光临。此问题中的所有代码都使用
boost::asio::windows::stream_handle并调用async_*_someAPI。我理解这种混淆,因为我包含了async_pipe.hpp标头(将对其进行编辑),但您会注意到我实际上并没有在代码中使用它们(我在第一次尝试时使用它们代替stream_handle,但是他们根本没有工作)。我想要实现的是在 Windows(最终是 Unix)上异步使用命名管道,而无需自己编写低级代码。boost::asio是我尝试的第一件事。 -
我怀疑
async_read_some正在吞咽错误。当我在等待读取超时时尝试从另一个线程窥视管道时,出现“管道关闭”错误。但是async_read_some声称它将显示从 API 获得的任何错误。为什么它不告诉我管道已关闭?在假设同步 API 可能会给我错误的假设下,我在我的“答案”中编写了代码,然后结果发现代码有效,这并没有让我更接近于找出答案为什么异步 API 不起作用。 -
我认为 async_pipe 应该可以正常工作(无论如何它在内部使用
stream_handle,并且省去了设置管道的所有麻烦)。如果没有,那很烦人——这可能是直截了当的事情。我没有windows环境,所以我认为我无法在合理的时间范围内查看它 -
另外,
async_read_some没有吞咽错误。如果是这样,那就是一个错误(或者分配给stream_handle的句柄不符合某些记录标准?)
标签: c++ windows boost boost-asio