【问题标题】:Raising events on separate thread在单独的线程上引发事件
【发布时间】:2013-09-23 18:40:29
【问题描述】:

我正在开发一个组件,它需要处理实时提要并以非常快的方式将数据广播给听众(精度约为 100 纳秒级,如果我能做到这一点,甚至会更低)目前我正在提出一个订阅者可以订阅的我的代码中的事件。但是,因为在 C# 事件处理程序中运行在引发事件的同一线程上,所以引发事件的我的线程将被阻塞,直到所有订阅者完成处理事件。我无法控制订阅者的代码,因此他们可能会在事件处理程序中执行任何耗时的操作,这可能会阻塞正在广播的线程。

我该怎么做才能将数据广播给其他订阅者,但仍能很快地广播内容??

【问题讨论】:

  • 听起来你需要在它自己的线程中触发事件。
  • 100ns 大约是 300 条指令。祝你以这样的速度获得跨线程同步。
  • 我不明白您如何无法控制订阅者的代码。其他程序可以订阅事件吗?
  • @Blam 如果您正在编写将由各种不同类型的消费者导入的库代码,每个都添加带有任意代码的事件处理程序......

标签: c# .net multithreading events asynchronous


【解决方案1】:

100 ns 是一个很难达到的目标。我相信这需要深入了解您正在做什么以及为什么要达到这种性能。

但是,异步调用事件订阅者很容易解决。 其他人 Jon Skeet 已经回答了 here

foreach (MyDelegate action in multicast.GetInvocationList())
{
    action.BeginInvoke(...);
}

编辑: 我还应该提到,您需要在 real-time operating system 上运行才能为您的用户提供严格的性能保证。

【讨论】:

  • 如果您发现另一个重复的问题,您应该标记/投票关闭,而不是发布带有链接的答案。
  • @Servy 另一个问题没有解决时间限制
  • 你的答案也没有。
  • +1,确实如此。 100 ns 确实是一个非常难以用 BeginInvoke() 实现的目标。只是上下文切换已经是几千个cpu周期了:)
【解决方案2】:

您似乎正在寻找任务。以下是我为我的工作编写的扩展方法,它可以异步调用事件,以便每个事件处理程序都在它们自己的线程上。我无法评论它的速度,因为这对我来说从来都不是要求。


更新

根据 cmets 我对其进行了调整,以便只创建一个任务来调用所有订阅者

/// <summary>
/// Extension method to safely encapsulate asynchronous event calls with checks
/// </summary>
/// <param name="evnt">The event to call</param>
/// <param name="sender">The sender of the event</param>
/// <param name="args">The arguments for the event</param>
/// <param name="object">The state information that is passed to the callback method</param>
/// <remarks>
/// This method safely calls the each event handler attached to the event. This method uses <see cref="System.Threading.Tasks"/> to
/// asynchronously call invoke without any exception handling. As such, if any of the event handlers throw exceptions the application will
/// most likely crash when the task is collected. This is an explicit decision since it is really in the hands of the event handler
/// creators to make sure they handle issues that occur do to their code. There isn't really a way for the event raiser to know
/// what is going on.
/// </remarks>
[System.Diagnostics.DebuggerStepThrough]
public static void AsyncSafeInvoke( this EventHandler evnt, object sender, EventArgs args )
{
    // Used to make a temporary copy of the event to avoid possibility of
    // a race condition if the last subscriber unsubscribes
    // immediately after the null check and before the event is raised.
    EventHandler handler = evnt;
    if (handler != null)
    {
        // Manually calling all event handlers so that we could capture and aggregate all the
        // exceptions that are thrown by any of the event handlers attached to this event.  
        var invocationList = handler.GetInvocationList();

        Task.Factory.StartNew(() =>
        {
            foreach (EventHandler h in invocationList)
            {
                // Explicitly not catching any exceptions. While there are several possibilities for handling these 
                // exceptions, such as a callback, the correct place to handle the exception is in the event handler.
                h.Invoke(sender, args);
            }
        });
    }
}

【讨论】:

  • 奇怪的是他不需要在他们自己的线程中调用每个处理程序,而是在一个新线程中调用所有处理程序。这将显着减少开销。
  • +1 适用于广泛的 cmets,但是,这并不能解决他的 100 ns 限制
  • 这里明确复制委托是没有意义的;它在传递给方法时被复制。
  • 我的直觉是他确实想要异步调用每个处理程序,以便 1 个不良消费者不会延迟向其他消费者发送消息。
  • 关于 null 检查事件 - 我发现最好为每个委托订阅一个空处理程序,以避免所有极其微妙的竞争条件。 public event EventHandler MyEvent += ()=&gt;{}; 防止潜在的危险。
【解决方案3】:

您可以在事件处理程序上使用这些简单的扩展方法:

public static void Raise<T>(this EventHandler<T> handler, object sender, T e) where T : EventArgs {
    if (handler != null) handler(sender, e);
}

public static void Raise(this EventHandler handler, object sender, EventArgs e) {
    if (handler != null) handler(sender, e);
}

public static void RaiseOnDifferentThread<T>(this EventHandler<T> handler, object sender, T e) where T : EventArgs {
    if (handler != null) Task.Factory.StartNewOnDifferentThread(() => handler.Raise(sender, e));
}

public static void RaiseOnDifferentThread(this EventHandler handler, object sender, EventArgs e) {
    if (handler != null) Task.Factory.StartNewOnDifferentThread(() => handler.Raise(sender, e));
}

public static Task StartNewOnDifferentThread(this TaskFactory taskFactory, Action action) {
    return taskFactory.StartNew(action: action, cancellationToken: new CancellationToken());
}

用法:

public static Test() {
     myEventHandler.RaiseOnDifferentThread(null, EventArgs.Empty);
}

cancellationToken 是保证StartNew() 实际使用不同线程所必需的,正如here 所解释的那样。

【讨论】:

    【解决方案4】:

    我无法确定这是否能够可靠地满足 100ns 的要求,但这里有一个替代方案,您可以为最终用户提供一种方法,为您提供一个您将填充的 ConcurrentQueue,他们可以在单独的线程上收听.

    class Program
    {
        static void Main(string[] args)
        {
            var multicaster = new QueueMulticaster<int>();
    
            var listener1 = new Listener(); //Make a couple of listening Q objects. 
            listener1.Listen();
            multicaster.Subscribe(listener1);
    
            var listener2 = new Listener();
            listener2.Listen();
            multicaster.Subscribe(listener2);
    
            multicaster.Broadcast(6); //Send a 6 to both concurrent Queues. 
            Console.ReadLine();
        }
    }
    
    //The listeners would run on their own thread and poll the Q like crazy. 
    class Listener : IListenToStuff<int>
    {
        public ConcurrentQueue<int> StuffQueue { get; set; }
    
        public void Listen()
        {
            StuffQueue = new ConcurrentQueue<int>();
            var t = new Thread(ListenAggressively);
            t.Start();
    
        }
    
        void ListenAggressively()
        {
            while (true)
            {
                int val;
                if(StuffQueue.TryDequeue(out val))
                    Console.WriteLine(val);
            }
        }
    }
    
    //Simple class that allows you to subscribe a Queue to a broadcast event. 
    public class QueueMulticaster<T>
    {
        readonly List<IListenToStuff<T>> _subscribers = new List<IListenToStuff<T>>();
        public void Subscribe(IListenToStuff<T> subscriber)
        {
            _subscribers.Add(subscriber);
        }
        public void Broadcast(T value)
        {
            foreach (var listenToStuff in _subscribers)
            {
                listenToStuff.StuffQueue.Enqueue(value);
            }
        }
    }
    
    public interface IListenToStuff<T>
    {
        ConcurrentQueue<T> StuffQueue { get; set; }
    }
    

    鉴于您无法阻止其他侦听器的处理,这意味着多个线程。在侦听器上设置专用侦听线程似乎是一种合理的尝试方法,并发队列似乎是一种不错的交付机制。在这个实现中,它只是不断地轮询,但您可以使用线程信号来减少 cpu 负载,使用类似AutoResetEvent 的东西。

    【讨论】:

    【解决方案5】:

    信号和共享内存非常快。您可以发送一个单独的信号来告诉应用程序从共享内存位置读取消息。当然,如果您想要低延迟,信号仍然是您的应用程序必须在高优先级线程上使用的事件。我会在数据中包含一个时间标签,以便接收器可以补偿不可避免的延迟。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-04-24
      • 1970-01-01
      • 2021-12-09
      • 2011-09-30
      • 2011-04-07
      • 1970-01-01
      相关资源
      最近更新 更多