【问题标题】:Usage of ReadFile(Ex) with BindIoCompletionCallbackReadFile(Ex) 与 BindIoCompletionCallback 的使用
【发布时间】:2016-12-25 09:49:33
【问题描述】:

我正在尝试使用命名管道编写 IPC。

服务器代码:http://pastebin.com/tHyAv0e0

客户端代码:http://pastebin.com/Qd0yGBca

我的问题是关于服务器的。跟随 SO 用户,我正在尝试在服务器代码中使用 BindIoCompletionCallback() 。服务器由以下功能组成:

  • print_last_error : 打印上一个错误的可读消息
  • IocpThreadProc : 传递给 BindIoCompletionCallback() 的回调,它调用 ConnectNamedPipe() 和 ReadFile()
  • server_new : 创建命名管道并尝试连接到管道的另一端(即客户端),创建退出事件并调用 BindIoCompletionCallback()
  • server_del : 应该释放资源
  • 主函数有一个无限循环,等待退出事件发出信号。

当客户端连接时,它会发送消息“salut, c'est le client !”。我已将 ReadFile() 的缓冲区设置为 5,以测试我必须多次调用 ReadFile() 的情况。我有以下输出:

connection pending...
waiting for client...
 ** 0, 0
reading data
 * ReadFile : 0
 ** 0, 5
msg:
reading data
 ** 0, 5
 * ReadFile : 5
reading data
msg: , c'e
 * ReadFile : 5
 ** 0, 5
msg: st le
reading data
 * ReadFile : 5
 ** 0, 5
msg:  clie
reading data
 * ReadFile : 5
 ** 0, 4
msg: nt !~
reading data
IO_PENDING
 ** -1073741493, 0
reading data
unexpected error failed with error 109: Le canal de communication a ÚtÚ fermÚ.
WaitForSingleObject : 0

以**开头的行:它打印回调的参数

以 'msg' 开头的行:它打印由 Readfile 填充的缓冲区的消息

由于客户端发送的消息长度为24,我通常应该得到这5条消息(每条消息都是5个字符,除了最后一个是4个字符):

salut
, c'e
st le 
 clie
nt !

但我不能拥有消息的第一部分(即:“salut”)。当 I/O 操作完成时调用回调,可能是第一部分。但是我没有成功调用 ReadFile() 以获取消息的第一部分。我试图在主函数的主循环中、线程中、server_new() 等中调用 ReadFile()……除了正确的方法之外的所有内容。

有人知道如何解决这个问题吗?

谢谢

【问题讨论】:

    标签: windows winapi readfile named-pipes io-completion-ports


    【解决方案1】:

    您的代码包含大量基本错误。更确切地说是所有代码 - 一个完整的错误

    查看代码 sn-p(在 IocpThreadProcserver_new 中)

        char buf[READ_BUFSIZE];
        ret = ReadFileEx(svr->pipe, buf, sizeof(buf), &svr->ol, IocpThreadProc);
    

    char buf[READ_BUFSIZE] - 这是函数中的局部变量。从函数退出后 - 这将成为堆栈中的任意地址。因此,当读取操作完成时 - 这更快地破坏了您的堆栈或将是未定义的结果。所以这是错误的。您必须不将堆栈内存作为读取缓冲区传递,或者在读取操作完成之前不退出函数

    您将IocpThreadProc 作为参数传递给ReadFileEx

    lpCompletionRoutine

    指向要调用的完成例程的指针 当读取操作完成并且调用线程处于 警报等待状态。

    但你永远不会在警报状态下等待!

    以后你用

    BindIoCompletionCallback(svr->pipe, IocpThreadProc, 0);
    

    但是将文件绑定到 IOCP 并使用 APC 完成 (lpCompletionRoutine ) 是互斥的。如果说你打电话给BindIoCompletionCallback 之前 ReadFileEx(.., IocpThreadProc) - 你会得到错误ERROR_INVALID_PARAMETER

    来自 NtReadFile 源代码:

            //
            // If this file has an I/O completion port associated w/it, then
            // ensure that the caller did not supply an APC routine, as the
            // two are mutually exclusive methods for I/O completion
            // notification.
            //
    
            if (fileObject->CompletionContext && IopApcRoutinePresent( ApcRoutine )) {
                ObDereferenceObject( fileObject );
                return STATUS_INVALID_PARAMETER;
            }
    

    您的代码“工作”只是因为您在调用ReadFileEx(.., IocpThreadProc) 之后绑定了 IOCP。但是当读取操作完成时会发生什么? APC(对于IocpThreadProc)将被插入到线程和数据包排队到IOCP。所以IocpThreadProc 将被调用两次,使用相同的数据进行单次操作。它只调用了一次,因为你永远不会在警报状态下等待,也不会从线程中弹出 APC。

    您将OVERLAPPED 嵌入到服务器 - 这是错误。每个异步 I/O 都必须有 唯一 OVERLAPPED。更确切地说,您必须定义自己的类,该类继承自OVERLAPPED。在这个类中有指向Server的指针,操作代码,可能是一些额外的数据。您需要在每次 I/O 操作之前分配此结构并在完成时释放它。

    GetLastError() in IocpThreadProc !!!

    你需要在这里使用DWORD dwErrorCodeGetLastError() 没有意义,因为这里调用了另一个线程,与操作无关。并且因为这是来自内核的回调,这里真的 NTSTATUS 值在 dwErrorCode 中,但不是 win32 错误。例如,在读取完成时,您可以获得STATUS_PIPE_BROKEN 但不是ERROR_BROKEN_PIPE,但这在 MSDN 文档中已经存在很大缺陷

    代码示例:

    class __declspec(novtable) IoObject
    {
        friend struct UIRP;
    
        LONG _dwRef;
    
    public:
    
        ULONG AddRef()
        {
            return InterlockedIncrement(&_dwRef);
        }
    
        ULONG Release()
        {
            ULONG dwRef = InterlockedDecrement(&_dwRef);
    
            if (!dwRef)
            {
                delete this;
            }
    
            return dwRef;
        }
    
    protected:
    
        IoObject()
        {
            _dwRef = 1;
        }
    
        virtual ~IoObject() 
        {
        };
    
        virtual void OnComplete(DWORD dwErrorCode, ULONG op, PVOID buf, DWORD dwNumberOfBytesTransfered) = 0;
    };
    
    struct UIRP : OVERLAPPED
    {
        IoObject* _obj;
        PVOID _buf;
        ULONG _op;
    
        UIRP(IoObject* obj, ULONG op, PVOID buf = 0)
        {
            RtlZeroMemory(static_cast<OVERLAPPED*>(this), sizeof(OVERLAPPED));
            _obj = obj;
            obj->AddRef();
            _op = op;
            _buf = buf;
        }
    
        void CheckError(BOOL f)
        {
            if (!f)
            {
                DWORD dwErrorCode = RtlGetLastNtStatus();
    
                if (dwErrorCode != STATUS_PENDING)
                {
                    OnComplete(dwErrorCode, 0);
                }
            }
        }
    
        ~UIRP()
        {
            _obj->Release();
        }
    
        static BOOL BindIoCompletion(HANDLE hObject)
        {
            return BindIoCompletionCallback(hObject, _OnComplete, 0);
        }
    
    private:
    
        static void WINAPI _OnComplete(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)
        {
            static_cast<UIRP*>(lpOverlapped)->OnComplete(dwErrorCode, dwNumberOfBytesTransfered);
        }
    
        void OnComplete(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered)
        {
            _obj->OnComplete(dwErrorCode, _op, _buf, dwNumberOfBytesTransfered);
            delete this;
        }
    };
    
    class __declspec(novtable) CPipe : public IoObject
    {
        enum {
            pipe_connect, pipe_read, pipe_write
        };
    protected:
        HANDLE _pipe;
        PBYTE _buf;
        ULONG _dataSize;
        ULONG _bufferSize;
    
    public:
    
        CPipe()
        {
            _pipe = INVALID_HANDLE_VALUE;
            _buf = 0;
            _dataSize = 0;
            _bufferSize = 0;
        }
    
        BOOL Create(ULONG bufferSize, PCWSTR name);
    
        BOOL Listen();
    
        BOOL Write(const void* data, ULONG cb);
    
        BOOL Disconnect()
        {
            if (IsServer())
            {
                return DisconnectNamedPipe(_pipe);
            }
    
            HANDLE pipe = InterlockedExchangePointer(&_pipe, INVALID_HANDLE_VALUE);
    
            if (pipe != INVALID_HANDLE_VALUE)
            {
                CloseHandle(pipe);
            }
    
            return TRUE;
        }
    
    protected:
    
        BOOL Read();// usually never call direct
    
        virtual BOOL OnRead(PVOID buf, ULONG cbTransferred) = 0;
    
        virtual BOOL OnConnect() = 0;   
    
        virtual void OnDisconnect() = 0;
    
        virtual BOOL IsServer() = 0;
    
        virtual void OnWrite(DWORD /*dwErrorCode*/)
        {
        }
    
        virtual ~CPipe()
        {
            HANDLE pipe = InterlockedExchangePointer(&_pipe, INVALID_HANDLE_VALUE);
    
            if (pipe != INVALID_HANDLE_VALUE)
            {
                CloseHandle(pipe);
            }
    
            if (_buf)
            {
                delete _buf;
            }
        }
    
    private:
    
        virtual void OnComplete(DWORD dwErrorCode, ULONG op, PVOID buf, DWORD dwNumberOfBytesTransfered);
    };
    
    void CPipe::OnComplete(DWORD dwErrorCode, ULONG op, PVOID buf, DWORD dwNumberOfBytesTransfered)
    {
        DbgPrint("%u>%s<%p>(%x, %x, %x)\n", IsServer(), __FUNCTION__, this, dwErrorCode, op, dwNumberOfBytesTransfered);
    
        switch (op)
        {
        case pipe_read:
    
            switch(dwErrorCode) 
            {
            case STATUS_SUCCESS:
                if (OnRead(buf, dwNumberOfBytesTransfered)) Read();
                break;
    
            case STATUS_PIPE_BROKEN:        // pipe handle has been closed, server must call DisconnectNamedPipe
            case STATUS_CANCELLED:          // CancelIo[Ex] called
                Disconnect();
    
            case STATUS_PIPE_DISCONNECTED:  // server call DisconnectNamedPipe
            case STATUS_INVALID_HANDLE:     // we close handle
                OnDisconnect();
                break;
    
            default:__debugbreak();
            }
            break;
    
        case pipe_connect:
    
            switch(dwErrorCode) 
            {
            case STATUS_SUCCESS:            // ERROR_SUCCESS 
            case STATUS_PIPE_CONNECTED:     // ERROR_PIPE_CONNECTED
            case STATUS_PIPE_CLOSING:       // ERROR_NO_DATA (really client can send data before disconnect, exist sense do read)
                if (OnConnect()) Read();
                break;
            case STATUS_PIPE_BROKEN:        // server call CloseHandle before ConnectNamedPipe complete
            case STATUS_PIPE_DISCONNECTED:  // server call DisconnectNamedPipe before ConnectNamedPipe
            case STATUS_CANCELLED:          // server call CancelIo[Ex]
                break;
            default: __debugbreak();
            }
            break;
    
        case pipe_write:
            OnWrite(dwErrorCode);
            LocalFree(buf);
            break;
    
        default: __debugbreak();
        }
    }
    
    BOOL CPipe::Create(ULONG bufferSize, PCWSTR name)
    {
        if (_buf = new UCHAR[bufferSize])
        {
            _bufferSize = bufferSize;
        }
        else
        {
            return FALSE;
        }
    
        static WCHAR pipeprefix[] = L"\\\\?\\pipe\\";
        PWSTR path = (PWSTR)alloca(wcslen(name) * sizeof(WCHAR) + sizeof(pipeprefix));
        wcscat(wcscpy(path, pipeprefix), name);
    
        BOOL bServer = IsServer();
    
        _pipe = bServer 
            ?
        CreateNamedPipeW(path,
            PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
            PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
            PIPE_UNLIMITED_INSTANCES,
            PAGE_SIZE, PAGE_SIZE, INFINITE, NULL)
            :
        CreateFile(path, FILE_READ_ATTRIBUTES|FILE_READ_DATA|
            FILE_WRITE_ATTRIBUTES|FILE_WRITE_DATA, FILE_SHARE_READ|FILE_SHARE_WRITE, 0, OPEN_EXISTING,
            FILE_FLAG_OVERLAPPED, 0);
    
        if (_pipe == INVALID_HANDLE_VALUE || !UIRP::BindIoCompletion(_pipe))
        {
            return FALSE;
        }
    
        return bServer ? Listen() : OnComplete(0, pipe_connect, 0, 0), TRUE;
    }
    
    BOOL CPipe::Listen()
    {
        if (UIRP* irp = new UIRP(this, pipe_connect))
        {
            irp->CheckError(ConnectNamedPipe(_pipe, irp));
    
            return TRUE;
        }
    
        return FALSE;
    }
    
    BOOL CPipe::Read()
    {
        ULONG NumberOfBytesToRead = _bufferSize - _dataSize;
    
        if (!NumberOfBytesToRead)
        {
            return FALSE;
        }
    
        PVOID buf = _buf + _dataSize;
    
        if (UIRP* irp = new UIRP(this, pipe_read, buf))
        {
            irp->CheckError(ReadFile(_pipe, buf, NumberOfBytesToRead, 0, irp));
    
            return TRUE;
        }
    
        return FALSE;
    }
    
    BOOL CPipe::Write(const void* data, ULONG cb)
    {
        if (PVOID buf = LocalAlloc(0, cb))
        {
            if (UIRP* irp = new UIRP(this, pipe_write, buf))
            {
                memcpy(buf, data, cb);
    
                irp->CheckError(WriteFile(_pipe, buf, cb, 0, irp));
    
                return TRUE;
            }
        }
    
        return FALSE;
    }
    
    class ServerPipe : public CPipe
    {
        virtual BOOL OnRead(PVOID buf, ULONG cbTransferred)
        {
            DbgPrint("%.*s\n", cbTransferred, buf);
    
            char sz[256];
            Write(sz, 1 + sprintf(sz, "response from %p server\n", this));
    
            return TRUE;
        }
    
        virtual BOOL OnConnect()
        {
            DbgPrint("%s<%p>\n", __FUNCTION__, this);
    
            return TRUE;
        }
    
        virtual void OnDisconnect()
        {
            DbgPrint("%s<%p>\n", __FUNCTION__, this);
            Listen();//
        }
    
        virtual BOOL IsServer()
        {
            return TRUE;
        }
    
        virtual ~ServerPipe()
        {
            DbgPrint("%s<%p>\n", __FUNCTION__, this);
        }
    };
    
    class ClientPipe : public CPipe
    {
        int _n;
    
        virtual BOOL OnRead(PVOID buf, ULONG cbTransferred)
        {
            DbgPrint("%.*s\n", cbTransferred, buf);
    
            if (--_n)
            {
                char sz[256];
                Write(sz, 1 + sprintf(sz, "request[%u] from %p client\n", _n, this));
                return TRUE;
            }
            return FALSE;
        }
    
        virtual BOOL OnConnect()
        {
            DbgPrint("%s<%p>\n", __FUNCTION__, this);
    
            _n = 3;
    
            char sz[256];
            Write(sz, 1 + sprintf(sz, "hello from %p client\n", this));
    
            return TRUE;
        }
    
        virtual void OnDisconnect()
        {
            DbgPrint("%s<%p>\n", __FUNCTION__, this);
        }
    
        virtual BOOL IsServer()
        {
            return FALSE;
        }
    
        virtual ~ClientPipe()
        {
            DbgPrint("%s<%p>\n", __FUNCTION__, this);
        }
    };
    
    DWORD CALLBACK ClientThread(void* name)
    {
        int n = 2;
        do 
        {
            MessageBox(0,0,L"client",MB_ICONWARNING);
            if (ClientPipe* p = new ClientPipe)
            {
                p->Create(PAGE_SIZE, (PCWSTR)name);
                p->Release();
            }
        } while (--n);
    
        return 0;
    }
    
    void pipeTest()
    {
        static WCHAR sname[] = L"__test_pipe__";
    
        if (HANDLE hThread = CreateThread(0, 0, ClientThread, sname, 0, 0))
        {
            CloseHandle(hThread);
        }
    
        if (ServerPipe* p = new ServerPipe)
        {
            p->Create(PAGE_SIZE, sname);
            p->Release();
        }
    
        MessageBox(0,0,0,0);
    }
    

    关于DWORD dwErrorCode

    VOID CALLBACK FileIOCompletionRoutine(
      __in  DWORD dwErrorCode,
      __in  DWORD dwNumberOfBytesTransfered,
      __in  LPOVERLAPPED lpOverlapped
    );
    

    BindIoCompletionCallback 文档中存在不明确

    返回值

    如果函数成功,则返回值非零。

    如果函数失败,返回值为零。获得扩展错误 信息,调用GetLastError函数。 返回值为 NTSTATUS 错误代码。检索相应的系统错误 代码,使用RtlNtStatusToDosError函数。

    返回的值是 NTSTATUS 错误代码是什么意思?什么返回值?

    这是DWORD dwErrorCode in FileIOCompletionRoutine

    真的,我们将内核模式指针传递给IO_STATUS_BLOCKOVERLAPPED 的前两个成员实际上是IO_STATUS_BLOCK)。当异步操作完成 - 内核填充IO_STATUS_BLOCK 并将数据包排队到 IOCP(或 APC 到线程)。 ntdll 从 IOCP 中提取 PIO_STATUS_BLOCK(所以我们得到了指向传递给 I/O api 的 OVERLAPPED 的指针),并填充

    dwErrorCode = Iosb->Status, 
    dwNumberOfBytesTransfered = (ULONG)Iosb->Information, 
    lpOverlapped = (LPOVERLAPPED)Iosb; 
    

    系统不做转换

    dwErrorCode = RtlNtStatusToDosError(Iosb-&gt;Status)

    但是直接将NTSTATUS 分配给DWORD dwErrorCode - 所以在FileIOCompletionRoutine 中我们必须比较dwErrorCode 不是wi32 错误代码而是NTSTATUS 代码(来自"ntstatus.h"

    所以我们从未在 FileIOCompletionRoutine 中看到 ERROR_BROKEN_PIPEERROR_PIPE_NOT_CONNECTED,但 STATUS_PIPE_BROKENSTATUS_PIPE_DISCONNECTED


    和代码示例使用新的Thread Pool API 代替BindIoCompletionCallback。这里的一大优势是在IoCompletionCallback (PTP_WIN32_IO_CALLBACK) 回调函数到位ULONG IoResult 已经使用了win32 错误,但不是原始NTSTATUS (IoResult = RtlNtStatusToDosError(Iosb-&gt;Status) 和注意ULONG_PTR NumberOfBytesTransferred (vs ULONG dwNumberOfBytesTransfered 来自FileIOCompletionRoutine (@987654384) @) 回调函数并将其与来自IO_STATUS_BLOCKULONG_PTR Information 进行比较。)

    #define StartIo(irp, pio, f) StartThreadpoolIo(_pio); irp->CheckError(f, _pio);
    
    class __declspec(novtable) IoObject
    {
        friend struct UIRP;
    
        LONG _dwRef;
    
    public:
    
        ULONG AddRef()
        {
            return InterlockedIncrement(&_dwRef);
        }
    
        ULONG Release()
        {
            ULONG dwRef = InterlockedDecrement(&_dwRef);
    
            if (!dwRef)
            {
                delete this;
            }
    
            return dwRef;
        }
    
    protected:
    
        IoObject()
        {
            _dwRef = 1;
        }
    
        virtual ~IoObject() 
        {
        };
    
        virtual void OnComplete(DWORD dwErrorCode, ULONG op, PVOID buf, DWORD dwNumberOfBytesTransfered) = 0;
    };
    
    struct UIRP : OVERLAPPED
    {
        IoObject* _obj;
        PVOID _buf;
        ULONG _op;
    
        UIRP(IoObject* obj, ULONG op, PVOID buf = 0)
        {
            RtlZeroMemory(static_cast<OVERLAPPED*>(this), sizeof(OVERLAPPED));
            _obj = obj;
            obj->AddRef();
            _op = op;
            _buf = buf;
        }
    
        void CheckError(BOOL f, PTP_IO pio)
        {
            if (!f)
            {
                DWORD dwErrorCode = GetLastError();
    
                if (dwErrorCode != ERROR_IO_PENDING)
                {
                    CancelThreadpoolIo(pio);
                    OnComplete(dwErrorCode, 0);
                }
            }
        }
    
        ~UIRP()
        {
            _obj->Release();
        }
    
        static PTP_IO BindIoCompletion(HANDLE hObject)
        {
            return CreateThreadpoolIo(hObject, _IoCompletionCallback, 0, 0);
        }
    
    private:
    
        static VOID CALLBACK _IoCompletionCallback(
            __inout      PTP_CALLBACK_INSTANCE /*Instance*/,
            __inout_opt  PVOID /*Context*/,
            __inout_opt  PVOID Overlapped,
            __in         ULONG IoResult,
            __in         ULONG_PTR NumberOfBytesTransferred,
            __inout      PTP_IO /*Io*/
            )
        {
            static_cast<UIRP*>(Overlapped)->OnComplete(IoResult, (ULONG)NumberOfBytesTransferred);
        }
    
        void OnComplete(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered)
        {
            _obj->OnComplete(dwErrorCode, _op, _buf, dwNumberOfBytesTransfered);
            delete this;
        }
    };
    
    class __declspec(novtable) CPipe : public IoObject
    {
        enum {
            pipe_connect, pipe_read, pipe_write
        };
    protected:
        HANDLE _pipe;
        PTP_IO _pio;
        PBYTE _buf;
        ULONG _dataSize;
        ULONG _bufferSize;
    
    public:
    
        CPipe()
        {
            _pipe = INVALID_HANDLE_VALUE;
            _buf = 0;
            _dataSize = 0;
            _bufferSize = 0;
            _pio = 0;
        }
    
        BOOL Create(ULONG bufferSize, PCWSTR name);
    
        BOOL Listen();
    
        BOOL Write(const void* data, ULONG cb);
    
        BOOL Disconnect()
        {
            if (IsServer())
            {
                return DisconnectNamedPipe(_pipe);
            }
    
            HANDLE pipe = InterlockedExchangePointer(&_pipe, INVALID_HANDLE_VALUE);
    
            if (pipe != INVALID_HANDLE_VALUE)
            {
                CloseHandle(pipe);
            }
    
            return TRUE;
        }
    
    protected:
    
        BOOL Read();// usually never call direct
    
        virtual BOOL OnRead(PVOID buf, ULONG cbTransferred) = 0;
    
        virtual BOOL OnConnect() = 0;   
    
        virtual void OnDisconnect() = 0;
    
        virtual BOOL IsServer() = 0;
    
        virtual void OnWrite(DWORD /*dwErrorCode*/)
        {
        }
    
        virtual ~CPipe()
        {
            if (_pio)
            {
                CloseThreadpoolIo(_pio);
            }
    
            HANDLE pipe = InterlockedExchangePointer(&_pipe, INVALID_HANDLE_VALUE);
    
            if (pipe != INVALID_HANDLE_VALUE)
            {
                CloseHandle(pipe);
            }
    
            if (_buf)
            {
                delete _buf;
            }
        }
    
    private:
    
        virtual void OnComplete(DWORD dwErrorCode, ULONG op, PVOID buf, DWORD dwNumberOfBytesTransfered);
    };
    
    void CPipe::OnComplete(DWORD dwErrorCode, ULONG op, PVOID buf, DWORD dwNumberOfBytesTransfered)
    {
        DbgPrint("%u>%s<%p>(%x, %x, %x)\n", IsServer(), __FUNCTION__, this, dwErrorCode, op, dwNumberOfBytesTransfered);
    
        switch (op)
        {
        case pipe_read:
    
            switch(dwErrorCode) 
            {
            case ERROR_SUCCESS:
                if (OnRead(buf, dwNumberOfBytesTransfered)) Read();
                break;
    
            case ERROR_BROKEN_PIPE:         // pipe handle has been closed , server must call DisconnectNamedPipe
            case ERROR_OPERATION_ABORTED:   // CancelIo[Ex] called
                Disconnect();
    
            case ERROR_PIPE_NOT_CONNECTED:  // server call DisconnectNamedPipe
            case ERROR_INVALID_HANDLE:      // we close handle
                OnDisconnect();
                break;
    
            default:__debugbreak();
            }
            break;
    
        case pipe_connect:
    
            switch(dwErrorCode) 
            {
            case ERROR_SUCCESS:             // client just connected 
            case ERROR_PIPE_CONNECTED:      // client already connected
            case ERROR_NO_DATA:             // client already connected and disconnected (really client can send data before disconnect, exist sense do read)
                if (OnConnect()) Read();
                break;
            case ERROR_BROKEN_PIPE:         // server call CloseHandle before ConnectNamedPipe complete
            case ERROR_PIPE_NOT_CONNECTED:  // server call DisconnectNamedPipe before ConnectNamedPipe
            case ERROR_OPERATION_ABORTED:   // server call CancelIo[Ex]
                break;
            default: __debugbreak();
            }
            break;
    
        case pipe_write:
            OnWrite(dwErrorCode);
            LocalFree(buf);
            break;
    
        default: __debugbreak();
        }
    }
    
    BOOL CPipe::Create(ULONG bufferSize, PCWSTR name)
    {
        if (_buf = new UCHAR[bufferSize])
        {
            _bufferSize = bufferSize;
        }
        else
        {
            return FALSE;
        }
    
        static WCHAR pipeprefix[] = L"\\\\?\\pipe\\";
        PWSTR path = (PWSTR)alloca(wcslen(name) * sizeof(WCHAR) + sizeof(pipeprefix));
        wcscat(wcscpy(path, pipeprefix), name);
    
        BOOL bServer = IsServer();
    
        _pipe = bServer 
            ?
        CreateNamedPipeW(path,
            PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
            PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
            PIPE_UNLIMITED_INSTANCES,
            PAGE_SIZE, PAGE_SIZE, INFINITE, NULL)
            :
        CreateFile(path, FILE_READ_ATTRIBUTES|FILE_READ_DATA|
            FILE_WRITE_ATTRIBUTES|FILE_WRITE_DATA, FILE_SHARE_READ|FILE_SHARE_WRITE, 0, OPEN_EXISTING,
            FILE_FLAG_OVERLAPPED, 0);
    
        if (_pipe == INVALID_HANDLE_VALUE || !(_pio = UIRP::BindIoCompletion(_pipe)))
        {
            return FALSE;
        }
    
        return bServer ? Listen() : OnComplete(0, pipe_connect, 0, 0), TRUE;
    }
    
    BOOL CPipe::Listen()
    {
        if (UIRP* irp = new UIRP(this, pipe_connect))
        {
            StartIo(irp, _pio, ConnectNamedPipe(_pipe, irp));
    
            return TRUE;
        }
    
        return FALSE;
    }
    
    BOOL CPipe::Read()
    {
        ULONG NumberOfBytesToRead = _bufferSize - _dataSize;
    
        if (!NumberOfBytesToRead)
        {
            return FALSE;
        }
    
        PVOID buf = _buf + _dataSize;
    
        if (UIRP* irp = new UIRP(this, pipe_read, buf))
        {
            StartIo(irp, _pio, ReadFile(_pipe, buf, NumberOfBytesToRead, 0, irp));
    
            return TRUE;
        }
    
        return FALSE;
    }
    
    BOOL CPipe::Write(const void* data, ULONG cb)
    {
        if (PVOID buf = LocalAlloc(0, cb))
        {
            if (UIRP* irp = new UIRP(this, pipe_write, buf))
            {
                memcpy(buf, data, cb);
    
                StartIo(irp, _pio, WriteFile(_pipe, buf, cb, 0, irp));
    
                return TRUE;
            }
        }
    
        return FALSE;
    }
    
    class ServerPipe : public CPipe
    {
        virtual BOOL OnRead(PVOID buf, ULONG cbTransferred)
        {
            DbgPrint("%.*s\n", cbTransferred, buf);
    
            char sz[256];
            Write(sz, 1 + sprintf(sz, "response from %p server\n", this));
    
            return TRUE;
        }
    
        virtual BOOL OnConnect()
        {
            DbgPrint("%s<%p>\n", __FUNCTION__, this);
    
            return TRUE;
        }
    
        virtual void OnDisconnect()
        {
            DbgPrint("%s<%p>\n", __FUNCTION__, this);
            Listen();//
        }
    
        virtual BOOL IsServer()
        {
            return TRUE;
        }
    
        virtual ~ServerPipe()
        {
            DbgPrint("%s<%p>\n", __FUNCTION__, this);
        }
    };
    
    class ClientPipe : public CPipe
    {
        int _n;
    
        virtual BOOL OnRead(PVOID buf, ULONG cbTransferred)
        {
            DbgPrint("%.*s\n", cbTransferred, buf);
    
            if (--_n)
            {
                char sz[256];
                Write(sz, 1 + sprintf(sz, "request[%u] from %p client\n", _n, this));
                return TRUE;
            }
    
            return FALSE;
        }
    
        virtual BOOL OnConnect()
        {
            DbgPrint("%s<%p>\n", __FUNCTION__, this);
    
            _n = 3;
    
            char sz[256];
            Write(sz, 1 + sprintf(sz, "hello from %p client\n", this));
    
            return TRUE;
        }
    
        virtual void OnDisconnect()
        {
            DbgPrint("%s<%p>\n", __FUNCTION__, this);
        }
    
        virtual BOOL IsServer()
        {
            return FALSE;
        }
    
        virtual ~ClientPipe()
        {
            DbgPrint("%s<%p>\n", __FUNCTION__, this);
        }
    };
    
    DWORD CALLBACK ClientThread(void* name)
    {
        int n = 2;
        do 
        {
            MessageBox(0,0,L"client",MB_ICONWARNING);
            if (ClientPipe* p = new ClientPipe)
            {
                p->Create(PAGE_SIZE, (PCWSTR)name);
                p->Release();
            }
        } while (--n);
    
        return 0;
    }
    
    void pipeTest()
    {
        static WCHAR sname[] = L"__test_pipe__";
    
        if (HANDLE hThread = CreateThread(0, 0, ClientThread, sname, 0, 0))
        {
            CloseHandle(hThread);
        }
    
        if (ServerPipe* p = new ServerPipe)
        {
            p->Create(PAGE_SIZE, sname);
            p->Release();
        }
    
        MessageBox(0,0,0,0);
    }
    

    【讨论】:

    • 对于 buf 注释,我知道局部变量是什么。如果您更仔细地查看代码,您会发现 Server 结构有 2 个字段(数据和大小),并且宏 LOCAL_APPEND_DATA 将 buf 附加到数据字段。所以这根本不是问题......
    • @vtorri - 没有。您将指向此缓冲区的指针传递给 ReadFileEx ?LOCAL_APPEND_DATA 此处绝对无关。您将 buf 作为输入参数传递给 ReadFile(svr-&gt;pipe, buf, sizeof(buf), NULL, lpOverlapped); - 所以 buf 必须始终有效,直到操作未完成。但你从 func 退出。你必须明白这一点。这很重要
    • 对于 server_new() 中的 ReadFileEx(),这是我尝试使其工作的一种尝试。关于独特的重叠,我不知道(我是 iocp 的新手)。你说我需要在每次 I/O 之前分配这个结构。那么,实际上,我应该在代码中分配它吗?
    • 对于 buf, , ok.所以将 buf 添加到 Server 结构应该就足够了,对吧?
    • 重叠的需要在每个 I/O 操作之前分配。
    猜你喜欢
    • 2012-01-30
    • 1970-01-01
    • 2012-10-01
    • 1970-01-01
    • 1970-01-01
    • 2015-03-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多