【问题标题】:Named pipes efficient asynchronous design命名管道高效异步设计
【发布时间】:2013-07-23 09:01:53
【问题描述】:

问题

设计一个高效且非常快速的命名管道客户端服务器框架。

当前状态

我已经拥有经过实战验证的生产测试框架。它很快,但是每个管道连接使用一个线程,如果有很多客户端,线程数可能很快就会很高。我已经使用了可以根据需要扩展的智能线程池(实际上是任务池)。

我已经对管道使用了 OVERLAPED 模式,但随后我使用 WaitForSingleObject 或 WaitForMultipleObjects 进行阻塞,这就是为什么我需要在服务器端每个连接一个线程

所需的解决方案:

客户端很好,但在服务器端,我想只为每个客户端请求而不是每个连接使用一个线程。因此,我不会在客户端的整个生命周期(连接/断开连接)中使用一个线程,而是每个任务使用一个线程。所以只有当客户端请求数据时才可以。

我在 MSDN 上看到一个示例,它使用 OVERLAPED 结构数组,然后使用 WaitForMultipleObjects 等待它们。我觉得这是一个糟糕的设计。我在这里看到两个问题。首先,您必须维护一个可以变得非常大的数组,并且删除的成本很高。其次,您有很多事件,每个数组成员都有一个。

我还看到了完成端口,例如 CreateIoCompletionPortGetQueuedCompletionStatus,但我看不出它们有什么更好的地方。

我想要的是 ReadFileExWriteFileEx 做的事情,他们调用回调例程 当操作完成时。这是一种真正的异步编程风格。但问题是 ConnectNamedPipe 不支持它,而且我看到线程需要处于警报状态,您需要调用一些 *Ex 函数来实现它。

那么如何最好地解决这样的问题?

这是 MSDN 的做法:http://msdn.microsoft.com/en-us/library/windows/desktop/aa365603(v=vs.85).aspx

我在这种方法中看到的问题是,如果WaitForMultipleObjects 的限制为 64 个句柄,我看不到如何同时连接 100 个客户端。当然我可以在每次请求后断开管道,但我的想法是像在 TCP 服务器中一样拥有一个永久的客户端连接,并在整个生命周期中跟踪客户端,每个客户端都有唯一的 ID 和客户端特定的数据。

理想的伪代码应该是这样的:

repeat
  // wait for the connection or for one client to send data
  Result = ConnectNamedPipe or ReadFile or Disconnect; 

  case Result of
    CONNECTED: CreateNewClient; // we create a new client
    DATA: AssignWorkerThread; // here we process client request in a thread
    DISCONNECT: CleanupAndDeleteClient // release the client object and data
  end;
until Aborted;

这样我们就只有一个监听线程来接受connect/disconnect/onData事件。线程池(工作线程)只处理实际请求。这样 5 个工作线程就可以为许多连接的客户端提供服务。

附: 我当前的代码应该不重要。我在 Delphi 中编写代码,但它是纯 WinAPI,所以语言无关紧要。

编辑:

目前 IOCP 看起来像解决方案:

I/O 完成端口为 在多处理器上处理多个异步 I/O 请求 系统。当一个进程创建一个 I/O 完成端口时,系统 为唯一目的是的请求创建一个关联的队列对象 为这些请求提供服务。处理许多并发的进程 异步 I/O 请求可以通过以下方式更快、更有效地做到这一点 结合预分配线程使用 I/O 完成端口 池而不是在线程收到 I/O 请求时创建线程。

【问题讨论】:

  • 另请注意WaitForMultipleObject 有 64 个句柄的限制 (MAXIMUM_WAIT_OBJECTS)!
  • 删除并不昂贵。那有什么意思?我在重叠结构上投票支持 WFMO。我看不出有什么不好的。只需要在等待数组中再添加一个事件,如果数组需要调整,或者完全中止正在进行中,它将停止等待。
  • @Dealeticus:动态数组的选择成本很高。如果删除中间的项目,则必须移动它后面的所有项目。这是编程结构 101 :) 好的,如果您有 100 个以下的项目,它不会显示。但我可以连接 100 多个客户端。
  • @Jochen Kalmbach:那会杀死那个实现:(
  • 谁投票赞成关闭? 3 票赞成关闭对我来说是一个完全有效的问题。对这个问题的 6 个赞成票和两个最爱证实了这一点。堆栈溢出一天比一天陌生。

标签: windows delphi winapi asynchronous named-pipes


【解决方案1】:

如果服务器必须处理超过 64 个事件(读/写),那么任何使用 WaitForMultipleObjects 的解决方案都变得不可行。这就是微软向 Windows 引入 IO 完成端口的原因。它可以使用最合适的线程数(通常是处理器/内核的数量)来处理非常多的 IO 操作。

IOCP的问题是很难正确实施。隐藏的问题像地雷一样在现场传播:[1]、[2](第 3.6 节)。我建议使用一些框架。谷歌搜索为 Delphi 开发人员提供了一个名为 Indy 的东西。也许还有其他人。

此时我会忽略命名管道的要求,如果这意味着编写我自己的 IOCP 实现。这不值得悲伤。

【讨论】:

  • 感谢您的回答。我认为这是正确的。我知道 Indy,并且我在其上构建了与 IPC 相同类型的框架。我的框架有两种实现,一种是基于命名管道的 IPC,另一种是 IMC(机器间通信),基于 Indy。我会听从您的警告,但仍会尝试实施。如果它不成功,那么至少我会学到一些新东西。
【解决方案2】:

我认为您忽略的是在任何给定时间您只需要几个侦听命名管道实例。管道实例连接后,您可以关闭该实例并创建一个新的侦听实例来替换它。

使用MAXIMUM_WAIT_OBJECTS(或更少)侦听命名管道实例,您可以拥有一个专用于使用WaitForMultipleObjectsEx 进行侦听的线程。同一个线程还可以使用ReadFileExWriteFileEx 和APC 处理其余的I/O。工作线程会将 APC 排队到 I/O 线程以启动 I/O,并且 I/O 线程可以使用任务池返回结果(以及让工作线程知道新连接)。

I/O 线程主函数如下所示:

create_events();
for (index = 0; index < MAXIMUM_WAIT_OBJECTS; index++) new_pipe_instance(i);

for (;;)
{
    if (service_stopping && active_instances == 0) break;

    result = WaitForMultipleObjectsEx(MAXIMUM_WAIT_OBJECTS, connect_events, 
                    FALSE, INFINITE, TRUE);

    if (result == WAIT_IO_COMPLETION) 
    {
        continue;
    }
    else if (result >= WAIT_OBJECT_0 && 
                     result < WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS) 
    {
        index = result - WAIT_OBJECT_0;
        ResetEvent(connect_events[index]);

        if (GetOverlappedResult(
                connect_handles[index], &connect_overlapped[index], 
                &byte_count, FALSE))
            {
                err = ERROR_SUCCESS;
            }
            else
            {
                err = GetLastError();
            }

        connect_pipe_completion(index, err);
        continue;
    }
    else
    {
        fail();
    }
}

唯一真正复杂的是,当您调用ConnectNamedPipe 时,它可能返回ERROR_PIPE_CONNECTED 以指示调用立即成功,或者如果调用立即失败,则返回ERROR_IO_PENDING 以外的错误。在这种情况下,您需要重置事件然后处理连接:

void new_pipe(ULONG_PTR dwParam)
{
    DWORD index = dwParam;

    connect_handles[index] = CreateNamedPipe(
        pipe_name, 
        PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
        PIPE_TYPE_MESSAGE | PIPE_WAIT | PIPE_ACCEPT_REMOTE_CLIENTS,
        MAX_INSTANCES,
        512,
        512,
        0,
        NULL);

    if (connect_handles[index] == INVALID_HANDLE_VALUE) fail();

    ZeroMemory(&connect_overlapped[index], sizeof(OVERLAPPED));
    connect_overlapped[index].hEvent = connect_events[index];

    if (ConnectNamedPipe(connect_handles[index], &connect_overlapped[index])) 
    {
        err = ERROR_SUCCESS;
    }
    else
    {
        err = GetLastError();

        if (err == ERROR_SUCCESS) err = ERROR_INVALID_FUNCTION;

        if (err == ERROR_PIPE_CONNECTED) err = ERROR_SUCCESS;
    }

    if (err != ERROR_IO_PENDING) 
    {
        ResetEvent(connect_events[index]);
        connect_pipe_completion(index, err);
    }
}

connect_pipe_completion 函数会在任务池中创建一个新任务来处理新连接的管道实例,然后排队 APC 调用 new_pipe 在同一索引处创建一个新的监听管道。

可以在关闭现有管道实例后重用它们,但在这种情况下,我认为不值得麻烦。

【讨论】:

  • 是的,这是可能的。事实上,我现在有这样的事情。但我不喜欢的是每个新连接在整个连接期间都从池中保留一个线程。如果您有 1000 个连接,它将使用 1000 个线程,这是一个非常糟糕的设计。我可以为每个请求连接和断开连接,但是我无法在服务器端拥有活动连接和客户端的列表。我失去了对客户的跟踪,我无法拥有双重沟通渠道。
  • 顺便说一句,一个请求总是很快的。请求是二进制数据包。我的框架是面向高级别的,并从用户那里抽象出管道。看这里的代码:cromis.net/blog/downloads/cromis-ipc
  • 不,在我的设计中,每个连接不使用一个线程。当连接发生时,在任务池中创建一个任务;该任务执行任何必要的预处理(例如检查访问规则),然后根据需要启动ReadFileExWriteFileEx(通过将APC 排队到I/O 线程)然后退出。当该 I/O 完成时,会创建另一个任务来处理它,依此类推。
  • 好的,我现在明白了。是的,这是我可以做的并且避免使用 IOCP。它会按照我想要的方式工作。谢谢。它基本上仍然使用 IOCP,因为它们内置在 ReadFileEx 中,但这样我就不必手动处理它了。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-04-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-06-20
相关资源
最近更新 更多