【问题标题】:Rising events without blocking and receiving events in the right order上升事件而不阻塞并以正确的顺序接收事件
【发布时间】:2015-10-07 13:18:53
【问题描述】:

这需要先解释一下。有一个工作线程必须引发一些事件:

Task.Run(() =>
{
    for(int i = 0; i < 123456789; i++)
    {
        ... // some job
        OnSomeEvent(i);
    }
});

同步上升事件将阻塞作业,直到所有事件处理程序完成:

void OnSomeEvent(int i) => SomeEvent?.Invoke(this, new SomeEventArgs(i));

异步事件上升不会再阻塞作业(耶!)

void OnSomeEvent(int i) => Task.Run(() => SomeEvent?.Invoke(this, new SomeEventArgs(i)));

但是现在还有另一个问题:事件没有按正确的顺序接收:

OnSomeEvent(1);
OnSomeEvent(2);
OnSomeEvent(3);
...

// event handler
SomeEvent += (s, e) => Console.WriteLine(e.I);

// possible output
1
3
2

问题:如何实现以正确顺序发生的异步事件?

最近我学到了什么Dispatcher.InvokeAsyncuses queue。看来我必须做类似的事情。如果我必须这样做:1)它应该是调用者的工作还是 2)我应该保持同步上升事件并且接收者必须组织生产者/消费者以防止工作阻塞?或者也许还有其他方法?

P.S.:这与ContinueWhith.. 无关,除非存储任务列表是一个正确的解决方案。我关心的是如何在以下情况下实现即发即弃事件:a) 调用者未被阻止 2) 事件以相同的顺序接收。

P.P.S.:我不知道如何让 MCVE 重现问题。它出现在实际项目中,UI 很重,线程很多。

【问题讨论】:

  • 也许你想使用一个先进先出的队列而不是使用事件,一旦准备好并处理它,就简单地退出队列。
  • @YuvalItzchakov,这意味着当有东西被添加到队列时触发一些事件。来电者必须有队列和上升事件。订阅者(可能很多)将收到事件并做什么?

标签: c# multithreading events asynchronous


【解决方案1】:

您可以使用以下TaskQueue 将异步操作添加到队列中,以便在队列中的前一个项目完成时启动每个操作:

public class TaskQueue
{
    private Task previous = Task.FromResult(false);
    private object key = new object();

    public Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        lock (key)
        {
            var next = previous.ContinueWith(t => taskGenerator()).Unwrap();
            previous = next;
            return next;
        }
    }
    public Task Enqueue(Func<Task> taskGenerator)
    {
        lock (key)
        {
            var next = previous.ContinueWith(t => taskGenerator()).Unwrap();
            previous = next;
            return next;
        }
    }
}

这允许你写:

private TaskQueue taskQueue = new TaskQueue();
private void OnSomeEvent(int i) => 
    taskQueue.Enqueue(() => Task.Run(() => SomeEvent?.Invoke(this, new SomeEventArgs(i))));

【讨论】:

  • 看起来很有希望,谢谢。在不阻塞调用者的情况下将任务链接到ContinueWith 链中。最重要的是 TaskQueue 可重用于其他有序异步事件。
  • @Sinatr 就是这样,是的。实际上,我遇到过很多这样的队列很有用的情况,所以它并不是一个特别独特的需求。
【解决方案2】:

您可以使用TPL Dataflow 中的ActionBlock 队列来维护事件队列。

您将按如下方式创建队列:

queue = new ActionBlock<SomeEventArgs>(item => SomeEvent?.Invoke(item));

然后你会像这样将事件添加到队列中:

queue.Post(new SomeEventArgs(value));

当不再需要队列时,您可以这样做:

queue.Complete();

之后,如果您需要等到仍在队列中的任何项目被处理,您可以这样做:

queue.Completion.Wait();

但是,请注意queue.Completion 实际上是Task,因此您经常将它与await 一起使用。

这是一个完整的示例,展示了一种方法(它不会让线程一直保持活动状态,只是为了处理事件队列):

using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace Demo
{
    public class SomeEventArgs : EventArgs
    {
        public SomeEventArgs(int value)
        {
            Value = value;
        }

        public int Value { get; }
    }

    internal class Program
    {
        public delegate void SomeEventHandler(SomeEventArgs e);

        public event SomeEventHandler SomeEvent;

        ActionBlock<SomeEventArgs> queue;

        private void run()
        {
            queue = new ActionBlock<SomeEventArgs>(item => SomeEvent?.Invoke(item));

            // Subscribe to my own event (this just for demonstration purposes!)

            this.SomeEvent += Program_SomeEvent;

            // Raise 100 events.

            for (int i = 0; i < 100; ++i)
            {
                OnSomeEvent(i);
                Console.WriteLine("Raised event " + i);
            }

            Console.WriteLine("Signalling that queue is complete.");
            queue.Complete();

            Console.WriteLine("Waiting for queue to be processed.");
            queue.Completion.Wait();

            Console.WriteLine("Done.");
        }

        private void Program_SomeEvent(SomeEventArgs e)
        {
            Console.WriteLine("Handled " + e.Value);
            Thread.Sleep(1); // Simulate load.
        }

        private void OnSomeEvent(int value)
        {
            queue.Post(new SomeEventArgs(value));
        }

        private static void Main()
        {
            new Program().run();
        }
    }
}

【讨论】:

  • 这需要有一个完整的线程池线程,专门用于在整个对象生命周期内(对于每个对象,如果有多个对象)仅泵送此队列。这是一个非常糟糕的主意,尤其是当它完全没有必要时。线程池是为短期操作而设计的,不是很长时间的操作,创建一个新的专用线程会占用大量资源,并且如果该对象有很多实例,则无法很好地扩展。
  • 实际上,更好的队列来自 TPL。我会考虑的。
  • @Servy 我们开始了 - 使用 TPL 的版本,不需要专门的线程。
猜你喜欢
  • 2019-03-07
  • 1970-01-01
  • 2014-08-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多