IOCP是一种高性能的I/O模型,更多资料可以google下。
    在.Net Framework下,没有提供IOCP的类库,我们需要引入Win32 API来建立IOCP Thread Pool。

[DllImport("Kernel32", CharSet = CharSet.Auto)]
            private static extern SafeFileHandle CreateIoCompletionPort(IntPtr hFile, IntPtr hExistingCompletionPort, IntPtr puiCompletionKey, UInt32 uiNumberOfConcurrentThreads);
            /// <summary> Win32Func: Closes an IO Completion Port Thread Pool </summary>
            [DllImport("Kernel32", CharSet = CharSet.Auto)]
            private static extern Boolean CloseHandle(SafeHandle hObject);
            /// <summary> Win32Func: Posts a context based event into an IO Completion Port Thread Pool </summary>
            [DllImport("Kernel32", CharSet = CharSet.Auto)]
            private static extern Boolean PostQueuedCompletionStatus(SafeFileHandle hCompletionPort, UInt32 uiSizeOfArgument, IntPtr dwCompletionKey, IntPtr pOverlapped);
            /// <summary> Win32Func: Waits on a context based event from an IO Completion Port Thread Pool.
            ///           All threads in the pool wait in this Win32 Function </summary>
            [DllImport("Kernel32", CharSet = CharSet.Auto)]
            private static extern Boolean GetQueuedCompletionStatus(SafeFileHandle hCompletionPort, out UInt32 pSizeOfArgument, out IntPtr dwCompletionKey, out IntPtr ppOverlapped, UInt32 uiMilliseconds);

    我们用CreateIOCompletionPort获得一个IOCP对象的句柄,用PostQueuedCompetionStatus把状态对象(Socket编程下一般系传socket)放进队列,开启一定量线程来运行GetQueuedCompletionStatus监听,GetQueuedCompletionStatus函数会阻塞调用线程。

    由于与非托管代码打交道,要实现Safe的代码,有几点需要注意。

    1.我们要传递一个状态对象地址给非托管代码,由于GC的关系,我们不能直接传递地址,因为GC在回收的过程中,会移动在堆上的对象,造成地址改变。一般来说,GC移动会修改托管代码里面地址指向,但我们现在把地址传递出托管代码范围,GC也无能为力了。情况如图。

IOCP Thread Pool 在 C# 的Safe实现 IOCP Thread Pool 在 C# 的Safe实现

    针对这种情况,可以用GCHandle类解决,GCHandle类的Alloc方法为对象注册,Alloc方法有两个参数,第二个参数系GCHandleType类型枚举,默认情况系Normal。当我们要GC不移动对象的时候,例如有个byte[]的Buffer需要非托管代码填充,可以使用Pinned。

GCHandle gch = GCHandle.Alloc(obj);
            PostQueuedCompletionStatus(GetHandle, (uint)Marshal.SizeOf(gch), IntPtr.Zero, (IntPtr)gch);


    2.PostQueuedCompletionStatus函数的第二个参数是要传送数据的长度,直接使用sizeof系unsafe代码,这里使用Marshal.SizeOf方法。

    3.还有需要注意的是,传递的对象必须实现[StructLayout(LayoutKind.Sequential)]标签,以保证该对象的成员在内存里连续分配,遵守C++方式。

    4.利用SafeFileHandle引用内核对象更安全,这个类能实现引用计数。

    至此,已经讲述完实现Safe代码的要点了。

IOCPThreadPool的实现代码:

 

 Continuum.SafeThreading
{

    [StructLayout(LayoutKind.Sequential)]
    public class MyData
    {
        
private int value;

        
public int Value
        {
            
get { return value; }
            
set { this.value = value; }
        }
    }

    
// Classes
    
//============================================
    /// <summary> This class provides the ability to create a thread pool to manage work.  The
    
///           class abstracts the Win32 IOCompletionPort API so it requires the use of
    
///           unmanaged code.  Unfortunately the .NET framework does not provide this functionality </summary>
    public sealed class SafeIOCPThreadPool
    {

        
// Win32 Function Prototypes
        /// <summary> Win32Func: Create an IO Completion Port Thread Pool </summary>
        [DllImport("Kernel32", CharSet = CharSet.Auto)]
        
private static extern SafeFileHandle CreateIoCompletionPort(IntPtr hFile, IntPtr hExistingCompletionPort, IntPtr puiCompletionKey, UInt32 uiNumberOfConcurrentThreads);

        
/// <summary> Win32Func: Closes an IO Completion Port Thread Pool </summary>
        [DllImport("Kernel32", CharSet = CharSet.Auto)]
        
private static extern Boolean CloseHandle(SafeHandle hObject);

        
/// <summary> Win32Func: Posts a context based event into an IO Completion Port Thread Pool </summary>
        [DllImport("Kernel32", CharSet = CharSet.Auto)]
        
private static extern Boolean PostQueuedCompletionStatus(SafeFileHandle hCompletionPort, UInt32 uiSizeOfArgument, IntPtr dwCompletionKey, IntPtr pOverlapped);

        
/// <summary> Win32Func: Waits on a context based event from an IO Completion Port Thread Pool.
        
///           All threads in the pool wait in this Win32 Function </summary>
        [DllImport("Kernel32", CharSet = CharSet.Auto)]
        
private static extern Boolean GetQueuedCompletionStatus(SafeFileHandle hCompletionPort, out UInt32 pSizeOfArgument, out IntPtr dwCompletionKey, out IntPtr ppOverlapped, UInt32 uiMilliseconds);

        
// Constants
        /// <summary> SimTypeConst: This represents the Win32 Invalid Handle Value Macro </summary>
        private readonly IntPtr INVALID_HANDLE_VALUE = new IntPtr(-1);

        
/// <summary> SimTypeConst: This represents the Win32 INFINITE Macro </summary>
        private readonly UInt32 INIFINITE = 0xffffffff;

        
/// <summary> SimTypeConst: This tells the IOCP Function to shutdown </summary>
        private readonly IntPtr SHUTDOWN_IOCPTHREAD = new IntPtr(0x7fffffff);


        
// Delegate Function Types
        /// <summary> DelType: This is the type of user function to be supplied for the thread pool </summary>
        public delegate void USER_FUNCTION(MyData obj);


        
// Private Properties
        private SafeFileHandle m_hHandle;
        
/// <summary> SimType: Contains the IO Completion Port Thread Pool handle for this instance </summary>
        private SafeFileHandle GetHandle { get { return m_hHandle; } set { m_hHandle = value; } }

        
private Int32 m_uiMaxConcurrency;
        
/// <summary> SimType: The maximum number of threads that may be running at the same time </summary>
        private Int32 GetMaxConcurrency { get { return m_uiMaxConcurrency; } set { m_uiMaxConcurrency = value; } }

        
private Int32 m_iMinThreadsInPool;
        
/// <summary> SimType: The minimal number of threads the thread pool maintains </summary>
        private Int32 GetMinThreadsInPool { get { return m_iMinThreadsInPool; } set { m_iMinThreadsInPool = value; } }

        
private Int32 m_iMaxThreadsInPool;
        
/// <summary> SimType: The maximum number of threads the thread pool maintains </summary>
        private Int32 GetMaxThreadsInPool { get { return m_iMaxThreadsInPool; } set { m_iMaxThreadsInPool = value; } }

        
private Object m_pCriticalSection;
        
/// <summary> RefType: A serialization object to protect the class state </summary>
        private Object GetCriticalSection { get { return m_pCriticalSection; } set { m_pCriticalSection = value; } }

        
private USER_FUNCTION m_pfnUserFunction;
        
/// <summary> DelType: A reference to a user specified function to be call by the thread pool </summary>
        private USER_FUNCTION GetUserFunction { get { return m_pfnUserFunction; } set { m_pfnUserFunction = value; } }

        
private Boolean m_bDisposeFlag;
        
/// <summary> SimType: Flag to indicate if the class is disposing </summary>
        private Boolean IsDisposed { get { return m_bDisposeFlag; } set { m_bDisposeFlag = value; } }

        
// Public Properties
        private Int32 m_iCurThreadsInPool;
        
/// <summary> SimType: The current number of threads in the thread pool </summary>
        public Int32 GetCurThreadsInPool { get { return m_iCurThreadsInPool; } set { m_iCurThreadsInPool = value; } }
        
/// <summary> SimType: Increment current number of threads in the thread pool </summary>
        private Int32 IncCurThreadsInPool() { return Interlocked.Increment(ref m_iCurThreadsInPool); }
        
/// <summary> SimType: Decrement current number of threads in the thread pool </summary>
        private Int32 DecCurThreadsInPool() { return Interlocked.Decrement(ref m_iCurThreadsInPool); }
        
private Int32 m_iActThreadsInPool;
        
/// <summary> SimType: The current number of active threads in the thread pool </summary>
        public Int32 GetActThreadsInPool { get { return m_iActThreadsInPool; } set { m_iActThreadsInPool = value; } }
        
/// <summary> SimType: Increment current number of active threads in the thread pool </summary>
        private Int32 IncActThreadsInPool() { return Interlocked.Increment(ref m_iActThreadsInPool); }
        
/// <summary> SimType: Decrement current number of active threads in the thread pool </summary>
        private Int32 DecActThreadsInPool() { return Interlocked.Decrement(ref m_iActThreadsInPool); }
        
private Int32 m_iCurWorkInPool;
        
/// <summary> SimType: The current number of Work posted in the thread pool </summary>
        public Int32 GetCurWorkInPool { get { return m_iCurWorkInPool; } set { m_iCurWorkInPool = value; } }
        
/// <summary> SimType: Increment current number of Work posted in the thread pool </summary>
        private Int32 IncCurWorkInPool() { return Interlocked.Increment(ref m_iCurWorkInPool); }
        
/// <summary> SimType: Decrement current number of Work posted in the thread pool </summary>
        private Int32 DecCurWorkInPool() { return Interlocked.Decrement(ref m_iCurWorkInPool); }



        
// Constructor, Finalize, and Dispose 
        
//***********************************************
        /// <summary> Constructor </summary>
        
/// <param name = "iMaxConcurrency"> SimType: Max number of running threads allowed </param>
        
/// <param name = "iMinThreadsInPool"> SimType: Min number of threads in the pool </param>
        
/// <param name = "iMaxThreadsInPool"> SimType: Max number of threads in the pool </param>
        
/// <param name = "pfnUserFunction"> DelType: Reference to a function to call to perform work </param>
        
/// <exception cref = "Exception"> Unhandled Exception </exception>
        public SafeIOCPThreadPool(Int32 iMaxConcurrency, Int32 iMinThreadsInPool, Int32 iMaxThreadsInPool, USER_FUNCTION pfnUserFunction)
        {
            
try
            {
                
// Set initial class state
                GetMaxConcurrency = iMaxConcurrency;
                GetMinThreadsInPool 
= iMinThreadsInPool;
                GetMaxThreadsInPool 
= iMaxThreadsInPool;
                GetUserFunction 
= pfnUserFunction;
                
// Init the thread counters
                GetCurThreadsInPool = 0;
                GetActThreadsInPool 
= 0;
                GetCurWorkInPool 
= 0;
                
// Initialize the Monitor Object
                GetCriticalSection = new Object();
                
// Set the disposing flag to false
                IsDisposed = false;

                
// Create an IO Completion Port for Thread Pool use
                GetHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, IntPtr.Zero, IntPtr.Zero, (UInt32)GetMaxConcurrency);

                
// Test to make sure the IO Completion Port was created
                if (GetHandle.IsInvalid)
                    
throw new Exception("Unable To Create IO Completion Port");
                
// Allocate and start the Minimum number of threads specified
                Int32 iStartingCount = GetCurThreadsInPool;
                ThreadStart tsThread 
= new ThreadStart(IOCPFunction);
                
for (Int32 iThread = 0; iThread < GetMinThreadsInPool; ++iThread)
                {
                    
// Create a thread and start it
                    Thread thThread = new Thread(tsThread);
                    thThread.Name 
= "IOCP " + thThread.GetHashCode();
                    thThread.Start();
                    
// Increment the thread pool count
                    IncCurThreadsInPool();
                    Console.WriteLine(thThread.Name);
                }
            }
            
catch (Exception)
            {
                
                
throw;
            }
                
            
        }

        
//***********************************************
        /// <summary> Finalize called by the GC </summary>
        ~SafeIOCPThreadPool()
        {
            
if (!IsDisposed)
                Dispose();
        }

        
//**********************************************
        /// <summary> Called when the object will be shutdown.  This
        
///           function will wait for all of the work to be completed
        
///           inside the queue before completing </summary>
        public void Dispose()
        {
            
try
            {
                
// Flag that we are disposing this object
                IsDisposed = true;
                
// Get the current number of threads in the pool
                Int32 iCurThreadsInPool = GetCurThreadsInPool;
                
// Shutdown all thread in the pool
                for (Int32 iThread = 0; iThread < iCurThreadsInPool; ++iThread)
                {
                    
bool bret = PostQueuedCompletionStatus(GetHandle, 4, SHUTDOWN_IOCPTHREAD, IntPtr.Zero);
                }
                
// Wait here until all the threads are gone
                while (GetCurThreadsInPool != 0) Thread.Sleep(100);

                    
// Close the IOCP Handle
                CloseHandle(GetHandle);
            }
            
catch
            {
            }
        }

        
// Private Methods
        
//*******************************************
        /// <summary> IOCP Worker Function that calls the specified user function </summary>
        private void IOCPFunction()
        {
            UInt32 uiNumberOfBytes;
            IntPtr dwCompletionKey;
            IntPtr lpOverlapped;
            
try
            {
                
while (true)
                {

                    
// Wait for an event
                    GetQueuedCompletionStatus(GetHandle, out uiNumberOfBytes, out dwCompletionKey, out lpOverlapped, INIFINITE);

                    
if(uiNumberOfBytes <= 0)
                    {
                        
continue;
                    }

                    
// Decrement the number of events in queue
                    DecCurWorkInPool();
                    
// Was this thread told to shutdown
                    if (dwCompletionKey == SHUTDOWN_IOCPTHREAD)
                        
break;
                    
// Increment the number of active threads
                    IncActThreadsInPool();
                    
try
                    {
                        
// Call the user function
                        GCHandle gch = GCHandle.FromIntPtr(lpOverlapped);
                        MyData obj 
= (MyData) gch.Target;
                        GetUserFunction(obj);
                    }
                    
catch
                    {
                        
throw;
                    }
                    
// Get a lock
                    Monitor.Enter(GetCriticalSection);
                    
try
                    {
                        
// If we have less than max threads currently in the pool
                        if (GetCurThreadsInPool < GetMaxThreadsInPool)
                        {
                            
// Should we add a new thread to the pool
                            if (GetActThreadsInPool == GetCurThreadsInPool)
                            {
                                
if (IsDisposed == false)
                                {
                                    
// Create a thread and start it
                                    ThreadStart tsThread = new ThreadStart(IOCPFunction);
                                    Thread thThread 
= new Thread(tsThread);
                                    thThread.Name 
= "IOCP " + thThread.GetHashCode();
                                    thThread.Start();
                                    
// Increment the thread pool count
                                    IncCurThreadsInPool();
                                }
                            }
                        }
                    }
                    
catch
                    {
                    }
                    
// Relase the lock
                    Monitor.Exit(GetCriticalSection);
                    
// Increment the number of active threads
                    DecActThreadsInPool();
                }
            }
            
catch
            {
            }
            
// Decrement the thread pool count
            DecCurThreadsInPool();
        }

        
// Public Methods
        
//******************************************
        /// <summary> IOCP Worker Function that calls the specified user function </summary>
        
/// <param name="obj"> SimType: A value to be passed with the event </param>
        
/// <exception cref = "Exception"> Unhandled Exception </exception>
        public void PostEvent(MyData obj)
        {
            
try
            {
                
// Only add work if we are not disposing
                if (IsDisposed == false)
                {
                        
// Post an event into the IOCP Thread Pool

                    GCHandle gch 
= GCHandle.Alloc(obj);
                    PostQueuedCompletionStatus(GetHandle, (
uint)Marshal.SizeOf(gch), IntPtr.Zero, (IntPtr)gch);

                    
// Increment the number of item of work
                    IncCurWorkInPool();
                    
// Get a lock
                    Monitor.Enter(GetCriticalSection);
                    
try
                    {
                        
// If we have less than max threads currently in the pool
                        if (GetCurThreadsInPool < GetMaxThreadsInPool)
                        {
                            
// Should we add a new thread to the pool
                            if (GetActThreadsInPool == GetCurThreadsInPool)
                            {
                                
if (IsDisposed == false)
                                {
                                    
// Create a thread and start it
                                    ThreadStart tsThread = new ThreadStart(IOCPFunction);
                                    Thread thThread 
= new Thread(tsThread);
                                    thThread.Name 
= "IOCP " + thThread.GetHashCode();
                                    thThread.Start();
                                    
// Increment the thread pool count
                                    IncCurThreadsInPool();
                                }
                            }
                        }
                    }
                    
catch
                    {
                    }
                    
// Release the lock
                    Monitor.Exit(GetCriticalSection);
                }
            }
            
catch (Exception e)
            {
                
throw e;
            }
            
catch
            {
                
throw new Exception("Unhandled Exception");
            }
        }
        
//*****************************************
        /// <summary> IOCP Worker Function that calls the specified user function </summary>
        
/// <exception cref = "Exception"> Unhandled Exception </exception>
        public void PostEvent()
        {
            
try
            {
                
// Only add work if we are not disposing
                if (IsDisposed == false)
                {
                        
// Post an event into the IOCP Thread Pool
                    PostQueuedCompletionStatus(GetHandle, 0, IntPtr.Zero, IntPtr.Zero);

                    
// Increment the number of item of work
                    IncCurWorkInPool();
                    
// Get a lock
                    Monitor.Enter(GetCriticalSection);
                    
try
                    {
                        
// If we have less than max threads currently in the pool
                        if (GetCurThreadsInPool < GetMaxThreadsInPool)
                        {
                            
// Should we add a new thread to the pool
                            if (GetActThreadsInPool == GetCurThreadsInPool)
                            {
                                
if (IsDisposed == false)
                                {
                                    
// Create a thread and start it
                                    ThreadStart tsThread = new ThreadStart(IOCPFunction);
                                    Thread thThread 
= new Thread(tsThread);
                                    thThread.Name 
= "IOCP " + thThread.GetHashCode();
                                    thThread.Start();
                                    
// Increment the thread pool count
                                    IncCurThreadsInPool();
                                }
                            }
                        }
                    }
                    
catch
                    {
                    }
                    
// Release the lock
                    Monitor.Exit(GetCriticalSection);
                }
            }
            
catch (Exception e)
            {
                
throw e;
            }
            
catch
            {
                
throw new Exception("Unhandled Exception");
            }
        }
    }
}


测试代码:
 SafeSample
{
    //============================================
    /// <summary> Sample class for the threading class </summary>
    public class UtilThreadingSample
    {
        
//******************************************* 
        /// <summary> Test Method </summary>
        static void Main()
        {
            
// Create the MSSQL IOCP Thread Pool
            SafeIOCPThreadPool pThreadPool = new SafeIOCPThreadPool(0510new SafeIOCPThreadPool.USER_FUNCTION(IOCPThreadFunction));
            
for (int i = 0; i < 100; i++)
            {
                pThreadPool.PostEvent(
new MyData(){Value = i});
            }
            pThreadPool.Dispose();
            Console.WriteLine(
"Disposed");
            Console.ReadLine();
        }

        
private static object syncRoot = new object();

        
//*****************************************
        /// <summary> Function to be called by the IOCP thread pool.  Called when
        
///           a command is posted for processing by the SocketManager </summary>
        
/// <param name="obj"> The value provided by the thread posting the event </param>
        static public void IOCPThreadFunction(MyData obj)
        {
            
try
            {
                    Console.WriteLine(
"Value: {0},Thread:{1}", obj.Value, Thread.CurrentThread.Name);
            }
            
catch (Exception pException)
            {
                Console.WriteLine(pException.Message);
            }
        }
    }
}

 

参考资料:

IOCP Thread Pooling in C# - Part I
http://www.devarticles.com/c/a/C-Sharp/IOCP-Thread-Pooling-in-C-sharp-Part-I/

IOCP Thread Pooling in C# - Part II
http://www.devarticles.com/c/a/C-Sharp/IOCP-Thread-Pooling-in-C-sharp-Part-II/

蛙蛙推荐:在c#使用IOCP(完成端口)的简单示例
http://www.cnblogs.com/onlytiancai/archive/2009/01/05/1241571.html

相关文章:

  • 2021-07-28
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-01-21
  • 2022-12-23
  • 2021-12-05
猜你喜欢
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-11-19
  • 2022-12-23
  • 2021-09-18
  • 2021-07-24
相关资源
相似解决方案