【问题标题】:Enforcing one async observable at a time一次强制执行一个异步可观察对象
【发布时间】:2013-12-22 15:55:25
【问题描述】:

我正在尝试使用Observable.FromAsync 将一些 TPL 异步集成到更大的 Rx 链中,就像在这个小例子中一样:

using System;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace rxtest
{
    class Program
    {
        static void Main(string[] args)
        {
            MainAsync().Wait();
        }

        static async Task MainAsync()
        {
            await Observable.Generate(new Random(), x => true,
                                      x => x, x => x.Next(250, 500))
                .SelectMany((x, idx) => Observable.FromAsync(async ct =>
                {
                    Console.WriteLine("start:  " + idx.ToString());
                    await Task.Delay(x, ct);
                    Console.WriteLine("finish: " + idx.ToString());
                    return idx;
                }))
                .Take(10)
                .LastOrDefaultAsync();
        }
    }
}

但是,我注意到这似乎会同时启动所有异步任务,而不是一次执行一个,这会导致应用程序的内存使用量激增。 SelectMany 的行为似乎与 Merge 没有什么不同。

在这里,我看到这样的输出:

start:  0
start:  1
start:  2
...

我想看看:

start:  0
finish: 0
start:  1
finish: 1
start:  2
finish: 2
...

我怎样才能做到这一点?

【问题讨论】:

  • 所以你想用并行库编写串行代码?
  • @AustinSalonen 这些工具旨在处理异步代码,而不仅仅是并行代码。并非所有异步代码都需要并行化。他在这里使用了完全合适的工具。
  • @Servy:我真的只是想看看这里到底发生了什么。您可以简单地阻止 FromAsync 调用来满足这种情况。这可能不是正确,但看起来这就是他想要的。
  • @AustinSalonen 这不会保持异步,并且不恰当地使用这些工具。鉴于 OP 正在使用这些工具,显然他们希望找到一个异步解决方案来解决这个问题。为这个问题找到一个完全同步的解决方案非常简单。
  • @AustinSalonen 如果您认为某个问题不够清楚,要求澄清比假设对方不知​​道自己在做什么而留下尖刻评论更有效。

标签: c# .net task-parallel-library system.reactive


【解决方案1】:

SelectMany 更改为SelectConcat

    static async Task MainAsync()
    {
        await Observable.Generate(new Random(), x => true,
                                  x => x, x => x.Next(250, 500))
            .Take(10)
            .Select((x, idx) => Observable.FromAsync(async ct =>
            {
                Console.WriteLine("start:  " + idx.ToString());
                await Task.Delay(x, ct);
                Console.WriteLine("finish: " + idx.ToString());
                return idx;
            }))
            .Concat()
            .LastOrDefaultAsync();
    }

编辑 - 我将 Take(10) 移到链上,因为 Generate 不会阻塞 - 所以它阻止了这个逃跑。

Select 将每个事件投射到一个流中,该流表示将从订阅开始的异步任务。 Concat 接受一个流,并在前一个完成后订阅每个后续的子流,将所有流连接成一个平面流。

【讨论】:

  • 编辑添加解释,加上一个小优化,以防止生成狂奔。
  • 或者,.Take(1).Repeat()
  • 其实应该是.Take(1).Repeat().Take(10),有点傻。
  • 搞定 - 如果你想要某种程度的并行性,请在此处使用 Merge(anInt) 而不是 Concat
猜你喜欢
  • 2017-05-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-04-23
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多