【问题标题】:Why is my IOCompetionCallback never executed on my IO completion port?为什么我的 IOCompetionCallback 从未在我的 IO 完成端口上执行?
【发布时间】:2021-10-09 11:36:33
【问题描述】:

最小的可重现示例:

Interop.cs

public static class Interop
{
    [DllImport("kernel32.dll")]
    public static extern IntPtr CreateIoCompletionPort(
        [In] IntPtr fileHandle,
        [In] IntPtr existingCompletionPort,
        [In] UInt32 completionKey,
        [In] UInt32 numberOfConcurrentThreads);

    [DllImport("kernel32.dll")]
    public static extern UInt32 GetLastError();

    [DllImport("kernel32.dll")]
    public static unsafe extern bool GetQueuedCompletionStatus(
        [In] IntPtr completionPort,
        [Out] out UInt32 ptrBytesTransferred,
        [Out] out UInt32 ptrCompletionKey,
        [Out] NativeOverlapped** lpOverlapped,
        [In] UInt32 dwMilliseconds);

    [DllImport("kernel32.dll")]
    public static extern IntPtr CreateFile(
        [In] string fileName,
        [In] UInt32 dwDesiredAccess,
        [In] UInt32 dwShareMode,
        [In] IntPtr lpSecurityAttributes,
        [In] UInt32 dwCreationDisposition,
        [In] UInt32 dwFlagsAndAttributes,
        [In] IntPtr hTemplateFile);

    [DllImport("kernel32.dll")]
    public static unsafe extern bool ReadFile(
        [In] IntPtr hFile,
        [Out] byte[] lpBuffer,
        [In] uint maxBytesToRead,
        [Out] out UInt32 bytesActuallyRead,
        [In] NativeOverlapped* lpOverlapped);

    [DllImport("kernel32.dll")]
    public static extern bool PostQueuedCompletionStatus(
        [In] IntPtr completionPort,
        [In] UInt32 bytesTrasferred,
        [In] UInt32 completionKey,
        [In] IntPtr lpOverlapped);
}

程序.cs

class Program
{
    static unsafe void Main(string[] args)
    {
        // create completion port
        var completionPortHandle = Interop.CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, 0,  0);

        ThreadLogger.Log("Completion port handle: {0}", completionPortHandle);

        var completionPortThread = new Thread(() => new IOCompletionWorker().Start(completionPortHandle))
        {
            IsBackground = true
        };
        completionPortThread.Start();

        const uint Flags = 128 | (uint)1 << 30;

        var fileHandle = Interop.CreateFile("test.txt", (uint)1 << 31, 0, IntPtr.Zero, 3,
            /*FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED */ Flags,
            IntPtr.Zero);

        ThreadLogger.Log("File handle: {0}", fileHandle);

        Interop.CreateIoCompletionPort(
            fileHandle,
            completionPortHandle,
            (uint)fileHandle.ToInt64(), 
            0);

        ThreadLogger.Log("Associated file handle with completion port");

        var readBuffer = new byte[1024];

        uint bytesRead;

        var overlapped = new Overlapped 
        {
            AsyncResult = new FileReadAsyncResult()
            {
                ReadCallback = (bytesCount, buffer) =>
                    {
                        var contentRead = Encoding.UTF8.GetString(buffer, 0, (int)bytesCount);
                        ThreadLogger.Log(contentRead);
                    },
                Buffer = readBuffer
            } 
        };

        NativeOverlapped* nativeOverlapped = overlapped.UnsafePack((uint errorCode, uint numBytes, NativeOverlapped* pOVERLAP) =>
        {
            ThreadLogger.Log("Why am I not getting printed?");
        }, readBuffer);

        ThreadLogger.Log("Before read in main thread");

        Interop.ReadFile(fileHandle, readBuffer, (uint)readBuffer.Length, out bytesRead, nativeOverlapped);

        ThreadLogger.Log("After read in main thread");

        Console.ReadLine();
    }
}

FileReadAsyncResult.cs

class FileReadAsyncResult : IAsyncResult
{
    public bool IsCompleted { get; private set; }

    public WaitHandle AsyncWaitHandle { get; private set; }

    public object AsyncState { get; private set; }

    public bool CompletedSynchronously { get; private set; }

    public Action<uint, byte[]> ReadCallback { get; set; }

    public byte[] Buffer { get; set; }
}

IOCompletionWorker.cs

public class IOCompletionWorker
{ 
    public unsafe void Start(IntPtr completionPort)
    {
        while (true)
        {
            uint bytesRead;
            uint completionKey;
            NativeOverlapped* nativeOverlapped;

            ThreadLogger.Log("About to get queued completion status on {0}", completionPort);

            var result = Interop.GetQueuedCompletionStatus(
                completionPort, 
                out bytesRead,
                out completionKey,
                &nativeOverlapped, 
                uint.MaxValue);

            var overlapped = Overlapped.Unpack(nativeOverlapped);

            if (result)
            {
                var asyncResult = ((FileReadAsyncResult)overlapped.AsyncResult);
                asyncResult.ReadCallback(bytesRead, asyncResult.Buffer);
            }
            else
            {
                ThreadLogger.Log(Interop.GetLastError().ToString());
            }

            Overlapped.Free(nativeOverlapped);
        }
    }
}

我知道如果我将Threadpool.BindHandle 与相应的文件句柄一起使用,我的回调将被运行 - 但我试图了解为什么它在我自己的 IOCP 上注册时没有被执行正在等待完成包。 (此外,线程池不知道如何处理我的自定义 AsyncResult - 那里的回调不会被执行。)

【问题讨论】:

  • 您的 CreateIoCompletionPort PInvoke 看起来有问题。第三个参数是 ULONG_PTR,它应该是指针大小而不是 UInt32。
  • 另外,如何从 API 调用中检查错误代码?如果你想学习,你应该首先检查哪个方法返回错误。
  • @AloisKraus 在任何地方都没有返回错误。这是从here 复制过来的代码。我只是好奇为什么回调最终没有被执行。当你使用ThreadPool.BindHandle 时它会被执行,但是我的自定义FileReadAsyncResult.ReadCallback 最终不会在那里被执行。
  • @AloisKraus 我只添加了提供IOCompletionCallbackoverlapped.UnsafePack 的部分(而不是null)。这是原始源代码与此处显示的源代码之间的唯一区别。
  • IO 完成回调是线程池的一项服务。如果您自己完成 IO 完成端口,则不会收到此回调。见github.com/dotnet/runtime/blob/main/src/coreclr/vm/…

标签: c# multithreading asynchronous .net-core async-await


【解决方案1】:

IO 完成端口通过创建完成端口、将其绑定到文件句柄然后启动 n 个 IO 完成端口线程来工作,这些线程等待 GetQueuedCompletionStatus 在此阻塞调用中返回。

您在代码中提到的回调不是 IO 完成端口基础结构的一部分。它是 .NET 线程池的一项服务,您可以在其中调用 ThreadPool.BindHandle(SafeHandle osHandle),它会在此句柄的 IO 完成完成时回调您。

但是,如果您自己做所有事情,那么您将无法使用 .NET 线程池服务。 调用调用栈是

IOCompletionPort.Sample!IOCompletionPort.Sample.Program+<>c.<Main>b__0_0(UInt32, UInt32, System.Threading.NativeOverlapped*)+0x45[c:\Source\async-io-talk\src\IOCompletionPorts\IOCompletionPort.Sample\Program.cs @ 63]
mscorlib!System.Threading._IOCompletionCallback.PerformIOCompletionCallback(UInt32, UInt32, System.Threading.NativeOverlapped*)+0x84
clr!CallDescrWorkerInternal+0x83
clr!CallDescrWorkerWithHandler+0x4e
clr!DispatchCallSimple+0x67
clr!BindIoCompletionCallBack_Worker+0xee
clr!ManagedThreadBase_DispatchInner+0x40
clr!ManagedThreadBase_DispatchMiddle+0x6c
clr!ManagedThreadBase_DispatchOuter+0x4c
clr!ManagedThreadBase_FullTransitionWithAD+0x2f
clr!BindIoCompletionCallbackStubEx+0xb9
clr!BindIoCompletionCallbackStub+0x9
clr!ThreadpoolMgr::CompletionPortThreadStart+0x604
clr!Thread::intermediateThreadProc+0x8b
KERNEL32!BaseThreadInitThunk+0x14
ntdll!RtlUserThreadStart+0x21

当 IO 完成完成时,在另一个线程上调用 BindIoCompletionCallbackStub 方法

FCIMPL1(FC_BOOL_RET, ThreadPoolNative::CorPostQueuedCompletionStatus, LPOVERLAPPED lpOverlapped)
{
    FCALL_CONTRACT;

    OVERLAPPEDDATAREF   overlapped = ObjectToOVERLAPPEDDATAREF(OverlappedDataObject::GetOverlapped(lpOverlapped));

    BOOL res = FALSE;

    HELPER_METHOD_FRAME_BEGIN_RET_1(overlapped); // Eventually calls BEGIN_SO_INTOLERANT_CODE_NOTHROW

    // OS doesn't signal handle, so do it here
    overlapped->Internal = 0;

    
    
    res = ThreadpoolMgr::PostQueuedCompletionStatus(lpOverlapped, 
        BindIoCompletionCallbackStub);

这是被调用的

ThreadPoolNative::CorPostQueuedCompletionStatus, LPOVERLAPPED lpOverlapped)
ThreadpoolMgr::CallbackForContinueDrainageOfCompletionPortQueue

在一个或多个 IO 完成完成后的某个时间执行 IO 完成回调。由于您自己处理所有事情,因此没有人调用 ThreadpoolMgr::CallbackForContinueDrainageOfCompletionPortQueue 来通知您完成的 IO 完成发起的读取请求。

【讨论】:

  • 我忙得不可开交,仍然需要讨论我们在聊天中讨论的内容。非常感谢您帮助我!
  • 没问题,这是一个有趣的问题,我终于可以了解有关 IO 完成端口的更多信息。
猜你喜欢
  • 2020-09-26
  • 2014-02-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-08-05
  • 1970-01-01
相关资源
最近更新 更多