【问题标题】:What is the TPL equivalent of rx's Observable.FromEventPattern?rx 的 Observable.FromEventPattern 的 TPL 等价物是什么?
【发布时间】:2012-03-14 20:33:54
【问题描述】:

在 rx 中你可以写:

var oe = Observable.FromEventPattern<SqlNotificationEventArgs>(sqlDep, "OnChange");

然后订阅observable,将sqlDep对象上的OnChange事件转化为observable。

同样,如何使用任务并行库从 C# 事件创建任务?

编辑:澄清 Drew 指出然后由 user375487 明确编写的解决方案适用于单个事件。任务一完成……嗯,就完成了。

observable 事件可以随时再次触发。它可以看作是一个可观察的流。 TPL 数据流中的一种 ISourceBlock。但是在文档http://msdn.microsoft.com/en-us/library/hh228603(v=vs.110).aspx 中没有 ISourceBlock 的示例。

我最终找到了一个论坛帖子,解释了如何做到这一点:http://social.msdn.microsoft.com/Forums/en/tpldataflow/thread/a10c4cb6-868e-41c5-b8cf-d122b514db0e

公共静态 ISourceBlock CreateSourceBlock( Action,Action,Action,ISourceBlock> 执行器) { var bb = new BufferBlock(); 执行者(t => bb.Post(t), () => bb.Complete(), e => bb.Fault(e), bb); 返回bb; }

//Remark the async delegate which defers the subscription to the hot source.
var sourceBlock = CreateSourceBlock<SomeArgs>(async (post, complete, fault, bb) =>
{
    var eventHandlerToSource = (s,args) => post(args);
    publisher.OnEvent += eventHandlerToSource;
    bb.Complete.ContinueWith(_ => publisher.OnEvent -= eventHandlerToSource);
});

我没有尝试过上面的代码。异步委托和 CreateSourceBlock 的定义可能不匹配。

【问题讨论】:

    标签: task-parallel-library system.reactive tpl-dataflow


    【解决方案1】:

    没有直接等效于 TPL 中的事件异步模式 (EAP)。您需要做的是使用 TaskCompletionSource&lt;T&gt; 在事件处理程序中向自己发出信号。 Check out this section on MSDN 是一个使用 WebClient::DownloadStringAsync 来演示模式的示例。

    【讨论】:

    • 感谢您的链接!但这不适用于一系列事件,仅适用于一个事件。我澄清了这个问题。
    • 明白了。所以我很困惑你是否已经想出 TPL Dataflow 的方式来做到这一点。如果你还没有,我可能会拼凑一个样本。 ISourceBlock 绝对是正确的方法。告诉我。
    【解决方案2】:

    您可以使用 TaskCompletionSource。

    public static class TaskFromEvent
    {
        public static Task<TArgs> Create<TArgs>(object obj, string eventName)
            where TArgs : EventArgs
        {
            var completionSource = new TaskCompletionSource<TArgs>();
            EventHandler<TArgs> handler = null;
    
            handler = new EventHandler<TArgs>((sender, args) =>
            {
                completionSource.SetResult(args);
                obj.GetType().GetEvent(eventName).RemoveEventHandler(obj, handler);
            });
    
            obj.GetType().GetEvent(eventName).AddEventHandler(obj, handler);
            return completionSource.Task;
        }
    }
    

    示例用法:

    public class Publisher
    {
        public event EventHandler<EventArgs> Event;
    
        public void FireEvent()
        {
            if (this.Event != null)
                Event(this, new EventArgs());
        }
    }
    
    class Program
    {
        static void Main(string[] args)
        {
            Publisher publisher = new Publisher();
            var task = TaskFromEvent.Create<EventArgs>(publisher, "Event").ContinueWith(e => Console.WriteLine("The event has fired."));
            publisher.FireEvent();
            Console.ReadKey();
        }
    }
    

    编辑根据您的说明,下面是一个示例,说明如何使用 TPL DataFlow 实现您的目标。

    public class EventSource
    {
        public static ISourceBlock<TArgs> Create<TArgs>(object obj, string eventName)
            where TArgs : EventArgs
        {
            BufferBlock<TArgs> buffer = new BufferBlock<TArgs>();
            EventHandler<TArgs> handler = null;
    
            handler = new EventHandler<TArgs>((sender, args) =>
            {
                buffer.Post(args);
            });
    
            buffer.Completion.ContinueWith(c =>
                {
                    Console.WriteLine("Unsubscribed from event");
                    obj.GetType().GetEvent(eventName).RemoveEventHandler(obj, handler);
                });
    
            obj.GetType().GetEvent(eventName).AddEventHandler(obj, handler);
            return buffer;
        }
    }
    
    public class Publisher
    {
        public event EventHandler<EventArgs> Event;
    
        public void FireEvent()
        {
            if (this.Event != null)
                Event(this, new EventArgs());
        }
    }
    
    class Program
    {
        static void Main(string[] args)
        {
            var publisher = new Publisher();
            var source = EventSource.Create<EventArgs>(publisher, "Event");
            source.LinkTo(new ActionBlock<EventArgs>(e => Console.WriteLine("New event!")));
            Console.WriteLine("Type 'q' to exit");
            char key = (char)0;
            while (true)
            {
                key = Console.ReadKey().KeyChar;             
                Console.WriteLine();
                if (key == 'q') break;
                publisher.FireEvent();
            }
    
            source.Complete();
            Console.ReadKey();
        }
    }
    

    【讨论】:

    • 感谢您的样品!但这不适用于一系列事件,仅适用于一个事件。我澄清了这个问题。
    • 好的,我猜你自己已经找到了答案。以防万一,首先不使用 Rx 的原因是什么?
    • 嗯,我不熟悉 rx,也不熟悉 TPL。在阅读了两个 rx 的文档后,起初似乎更简单。但是在尝试在现实世界中使用它之后,我确信相反。一些核心的 Rx 方法名称不容易记住,而且这些名称不会自己说话。流畅的方法看起来不错,但它并不像看起来那么流畅。它可能与 TPL 一样或更强大,但......现在它只是不适合。
    • 修改了我对数据流案例的回答。
    猜你喜欢
    • 1970-01-01
    • 2014-06-12
    • 1970-01-01
    • 2022-11-28
    • 2014-05-08
    • 2018-07-10
    • 2023-04-10
    • 2011-01-19
    • 2010-12-07
    相关资源
    最近更新 更多