【问题标题】:Cross Process Event - Release all waiters reliably跨进程事件 - 可靠地释放所有服务员
【发布时间】:2013-05-06 06:35:17
【问题描述】:

我通过 ManualResetEvent 创建了一个跨进程事件。当此事件确实发生时,n 个不同进程中的 n 个线程应该被解除阻塞并开始运行以获取新数据。问题在于,ManualResetEvent.Set 之后立即重置似乎不会导致所有等待线程唤醒。那里的文档很模糊

http://msdn.microsoft.com/en-us/library/windows/desktop/ms682396(v=vs.85).aspx

当手动重置事件对象的状态发出信号时,它保持 发出信号,直到它被 ResetEvent 显式重置为无信号 功能。任意数量的等待线程,或随后的线程 对指定事件对象开始等待操作,可以释放 对象的状态发出信号。

有一种名为PulseEvent 的方法似乎完全符合我的需要,但不幸的是它也存在缺陷。

等待同步对象的线程可以暂时 由内核模式 APC 从等待状态中移除,然后返回到 APC 完成后的等待状态。如果调用 PulseEvent 发生在线程已从等待中删除的时间 状态,线程不会被释放,因为 PulseEvent 释放 只有那些在它被调用时正在等待的线程。 因此,PulseEvent 是不可靠的,不应该被 new 应用程序。而是使用条件变量。

现在 MS 确实建议使用条件变量。

条件变量是启用线程的同步原语 等到特定情况发生。条件变量是 不能跨进程共享的用户模式对象。

按照文档,我似乎没有运气可靠地做到这一点。是否有一种简单的方法可以在没有规定限制的情况下通过一个 ManualResetEvent 完成相同的事情,或者我是否需要为每个侦听器进程创建一个响应事件以获取每个订阅调用者的 ACK?在那种情况下,我需要一个小的共享内存来注册订阅进程的 pid,但这似乎带来了一系列问题。当一个进程崩溃或没有响应时会发生什么? ....

提供一些上下文。我有新的状态要发布,所有其他进程应该从共享内存位置读取。当一次发生多个更新时,错过一个更新是可以的,但该过程必须至少读取最后一个最新值。我可以用超时进行轮询,但这似乎不是一个正确的解决方案。

目前我已经到了

ChangeEvent = new EventWaitHandle(false, EventResetMode.ManualReset, counterName + "_Event");

ChangeEvent.Set();
Thread.Sleep(1); // increase odds to release all waiters
ChangeEvent.Reset();

【问题讨论】:

    标签: c# .net windows multithreading


    【解决方案1】:

    处理生产者必须唤醒所有消费者并且消费者数量不断变化的情况的一个通用选项是使用移动围栏方法。此选项也需要共享内存 IPC 区域。该方法有时会导致消费者在没有工作的情况下被唤醒,尤其是在大量进程需要调度且负载很高的情况下,但它们总是会唤醒,除非在无可救药的超载机器上。

    创建几个手动重置事件,并让生产者维护一个计数器,以应对将要设置的下一个事件。除 NextToFire 事件外,所有事件均保持设置。消费者进程等待 NextToFire 事件。当生产者希望唤醒所有消费者时,它会重置 Next+1 事件并设置当前事件。所有消费者最终都会被安排,然后等待新的 NextToFire 事件。效果是只有生产者使用 ResetEvent,而消费者总是知道接下来会发生哪个事件来唤醒他们。

    所有用户初始化:(伪代码是 C/C++,不是 C#)

    // Create Shared Memory and initialise NextToFire;
    pSharedMemory = MapMySharedMemory();
    if (First to create memory) pSharedMemory->NextToFire = 0;
    
    HANDLE Array[4];
    Array[0] = CreateEvent(NULL, 1, 0, "Event1");
    Array[1] = CreateEvent(NULL, 1, 0, "Event2");
    Array[2] = CreateEvent(NULL, 1, 0, "Event3");
    Array[3] = CreateEvent(NULL, 1, 0, "Event4");
    

    制作人唤醒所有人

    long CurrentNdx = pSharedMemory->NextToFire;
    long NextNdx = (CurrentNdx+1) & 3;
    
    // Reset next event so consumers block
    ResetEvent(Array[NextNdx]);
    
    // Flag to consumers new value
    long Actual = InterlockedIncrement(&pSharedMemory->NextToFire) & 3;
    
    // Next line needed if multiple producers active.
    // Not a perfect solution
    if (Actual != NextNdx) ResetEvent(Actual);
    
    // Now wake them all up
    SetEvent(CurrentNdx);
    

    消费者等待逻辑

    long CurrentNdx = (pSharedMemory->NextToFire) & 3;
    WaitForSingleObject(Array[CurrentNdx],  Timeout);
    

    【讨论】:

    • 这是一个非常好的解决方案,可以解决我能想到的所有竞争条件。谢谢!
    【解决方案2】:

    从 .NET 4.0 开始,您可以使用 MemoryMappedFile 来同步进程内存。在这种情况下,将计数器写入 MemoryMappedFile 并从工作进程中递减它。如果计数器为零,则允许主进程重置事件。这是示例代码。

    主进程

    //number of WorkerProcess
    int numWorkerProcess = 5;
    
    //Create MemroyMappedFile object and accessor. 4 means int size.
    MemoryMappedFile mmf = MemoryMappedFile.CreateNew("test_mmf", 4);
    MemoryMappedViewAccessor accessor = mmf.CreateViewAccessor();
    
    EventWaitHandle ChangeEvent = new EventWaitHandle(false, EventResetMode.ManualReset, counterName + "_Event");
    
    //write counter to MemoryMappedFile
    accessor.Write(0, numWorkerProcess);
    
    //.....
    
    ChangeEvent.Set();
    
    //spin wait until all workerProcesses decreament counter
    SpinWait.SpinUntil(() => {
    
        int numLeft = accessor.ReadInt32(0);
        return (numLeft == 0);
    });
    
    
    ChangeEvent.Reset();
    

    工作进程

    //Create existed MemoryMappedfile object which created by main process.
    MemoryMappedFile mmf = MemoryMappedFile.OpenExisting("test_mmf");
    MemoryMappedViewAccessor accessor = mmf.CreateViewAccessor();
    
    //This mutex object is used for decreament counter.
    Mutex mutex = new Mutex(false, "test_mutex");
    EventWaitHandle ChangeEvent = new EventWaitHandle(false, EventResetMode.ManualReset, "start_Event");
    
    //....
    
    ChangeEvent.WaitOne();
    
    //some job...
    
    //decrement counter with mutex lock. 
    mutex.WaitOne();
    int count = accessor.ReadInt32(0);
    --count;
    accessor.Write(0, count);
    mutex.ReleaseMutex();
    /////////////////////////////////////
    

    如果环境小于 .NET 4.0,可以使用 win32 API 的 CreateFileMapping 函数来实现。

    【讨论】:

    • 这确实有效,但非常脆弱。当一个进程被杀死或挂起时,它永远不会减少全局计数器,你将永远等待。如果您使用超时,则可能(将)发生一个进程在您放弃后递减计数器。这将错误地减少“满”计数器。如果已注册的进程静默退出,您如何确定订阅者的数量以及如何处理该问题?在这种情况下,您需要根据猜测调整订阅者数量。这个解决方案的竞争条件太多了。
    • @Alois Kraus 是的,你是对的。它没有考虑工人过程中的事故。但我不知道为什么你的系统无法确定工作进程的数量。因为在大多数情况下,有控制器进程创建工作进程并管理它们。因此,这些控制器进程必须知道工作进程的数量和每个条件(包括错误)。无论如何,我认为最好的解决方案是为每个工作进程准备事件,并与控制器进程进行通信。
    • 这并不容易,因为我不知道订阅者的数量并且没有控制器。即使我知道这是错误的,因为订阅者可以随时取消订阅。这是另一个竞争条件,当您根据旧订阅计数传递事件时,订阅者会取消订阅。
    【解决方案3】:

    您写道:“PulseEvent 似乎完全符合我的需要,但不幸的是它也存在缺陷”。 PulseEvent 确实存在缺陷,但我不能同意手动重置事件存在缺陷。这是非常可靠的。只有在某些情况下您可以使用手动重置事件,而在某些情况下您不能使用它们。它不是万能的。还有很多其他工具,例如自动重置事件、管道等。

    如果您需要定期通知线程,但不需要跨进程发送数据,那么只通知线程的最佳方式是自动重置事件。您只需要每个线程都有自己的事件。所以,你有多少线程就有多少事件。

    如果您只需要向进程发送数据,最好使用命名管道。与自动重置事件不同,您不需要每个进程都有自己的管道。每个命名管道都有一个服务器和一个或多个客户端。当有许多客户端时,操作系统会为每个客户端自动创建许多相同命名管道的实例。命名管道的所有实例共享相同的管道名称,但每个实例都有自己的缓冲区和句柄,并为客户端/服务器通信提供单独的管道。实例的使用使多个管道客户端可以同时使用同一个命名管道。任何进程都可以作为一个管道的服务器和另一个管道的客户端,反之亦然,使对等通信成为可能。

    如果您将使用命名管道,则在您的场景中根本不需要事件,并且无论进程发生什么数据都将有保证的交付 - 每个进程都可能会出现长时间的延迟(例如通过交换),但数据最终将在没有您特别参与的情况下尽快交付。

    只有当通知只有一次时,所有线程(进程)的一个事件才可以。在这种情况下,您将需要手动重置事件,而不是自动重置事件。例如,如果您需要通知您的应用程序将很快退出,您可以发出这个常见的手动重置事件。但是,正如我之前所写,在您的场景中,命名管道是最佳选择。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-05-03
      • 2010-10-12
      • 2011-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-03-16
      • 1970-01-01
      相关资源
      最近更新 更多