【问题标题】:Convert IEnumerable<Task<T>> to IObservable<T>将 IEnumerable<Task<T>> 转换为 IObservable<T>
【发布时间】:2013-06-07 17:06:54
【问题描述】:

我正在尝试使用响应式扩展 (Rx) 来缓冲任务完成时的枚举。有谁知道是否有一种干净的内置方法可以做到这一点? ToObservable 扩展方法只会生成一个IObservable&lt;Task&lt;T&gt;&gt;,这不是我想要的,我想要一个IObservable&lt;T&gt;,然后我可以使用Buffer

人为的例子:

//Method designed to be awaitable
public static Task<int> makeInt()
{
     return Task.Run(() => 5);
}

//In practice, however, I don't want to await each individual task
//I want to await chunks of them at a time, which *should* be easy with Observable.Buffer 
public static void Main() 
{
    //Make a bunch of tasks
    IEnumerable<Task<int>> futureInts = Enumerable.Range(1, 100).Select(t => makeInt());

    //Is there a built in way to turn this into an Observable that I can then buffer?
    IObservable<int> buffered = futureInts.TasksToObservable().Buffer(15); //????

    buffered.Subscribe(ints => {
        Console.WriteLine(ints.Count()); //Should be 15
    });
}

【问题讨论】:

标签: c# .net task-parallel-library system.reactive


【解决方案1】:

您可以使用 Task 可以使用 another overload of ToObservable() 转换为 observable 的事实。

当您拥有(单个项目)可观察对象的集合时,您可以使用 Merge() 创建一个包含完成的项目的单个可观察对象。

因此,您的代码可能如下所示:

futureInts.Select(t => t.ToObservable())
          .Merge()
          .Buffer(15)
          .Subscribe(ints => Console.WriteLine(ints.Count));

【讨论】:

  • 有没有一种简单的方法可以将Task&lt;IEnumerable&lt;T&gt;&gt; 转换为IObservable&lt;T&gt;,而Task&lt;T&gt; 的结果是IObservable&lt;T&gt; 中的结果流?
  • @JonathanPeel 我认为task.ToObservable().SelectMany(x =&gt; x) 应该可以工作。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多