【问题标题】:Fetching data based on stream基于流获取数据
【发布时间】:2015-09-23 12:44:20
【问题描述】:

我们有一个通知数据更改的源,当一个项目进入时,我们异步获取新数据。

source.SelectMany(async n => { await FetchData()});

在等待加载数据时,可能会收到许多通知,但我们希望忽略除 1 之外的所有通知,这样我们就不会为每个通知都获取数据,而是只获取一次。
在获取数据之前,我们如何忽略来自源的所有通知,除了 1?

我感觉解决方案将涉及将 FetchData() 转换为 IObservable,但我仍然不知道什么 Rx 原语可以让我们组合流。

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    看起来像是一个非常经典(但缺少)Rx 运算符的用例:ObserveLatestOn(示例实现 here,但您可以在网络上找到其他人)。

    source.ObserveLatestOn(TimeSpan.Zero, Schedulers.NewThread).SelectMany(async n => { await FetchData()})
    

    请注意,此实现仅在单线程调度程序上进行了测试(主要是 UI,但将与 NewThread 一起使用),而不是Immediate/CurrentThread(可能有效)或TaskPool(可能有竞争条件)

    还请注意,您在这里遇到的问题是 Rx.Net 中缺少反应性拉动 backpressure(讨论中 here),RxJava 对这种情况有很好的背压支持(例如 onBackpressureLatest

    【讨论】:

      【解决方案2】:

      我确信有一种使用 Rx 的方法,但我想到的一个简单解决方案是使用 AsyncAutoResetEvent(AutoResetEvent 的异步版本)。

      基本上,您创建一个循环,该循环异步等待您的 AsyncAutoResetEvent 设置,该设置在收到新通知时完成。自动重置确保在下一次等待时,您将被异步阻止,直到收到新通知。

      您可以在 Stephen Cleary AsyncEx 创建的优秀库中找到作为 Nuget 包的 AsyncAutoResetEvent 类。

      这是一个简单的程序,展示了建议的解决方案的实际效果:

      class Program
      {
          static readonly AsyncAutoResetEvent _resetEvent = new AsyncAutoResetEvent(); 
      
          static void Main(string[] args)
          {
              // Start the asynchronous fetching loop...
              RunAsync();
      
              Task.Run(async () =>
              {
                  // Simulate fast notifications 
                  for (int i = 0; i < 15; i++)
                  {
                      OnNotification(i);
                      await Task.Delay(100);
                  }
      
                  // Simulate a pause of notifications 
                  await Task.Delay(2000);
      
                  // Simulate fast notifications 
                  for (int i = 0; i < 15; i++)
                  {
                      OnNotification(i);
                      await Task.Delay(100);
                  }
              });
      
              Console.ReadKey();
          }
      
          static void OnNotification(int index)
          {
              Console.WriteLine(DateTime.Now.ToLongTimeString() + " OnNotification " + index);
      
              // This will unlock the current or next WaitAsync on the _resetEvent
              _resetEvent.Set();
          }
      
      
          static async Task RunAsync()
          {
              // Uncomment this if you want to wait for a first notification before fetching.
              // await _resetEvent.WaitAsync();    
      
              while (true)
              {
                  Console.WriteLine(DateTime.Now.ToLongTimeString() + " Fetching...");
      
                  // Simulate long fetching
                  await Task.Delay(1000);
      
                  // Wait for a new notification before doing another fetch
                  await _resetEvent.WaitAsync();    
              }
          }
      }
      

      这是输出:

      12:04:51 PM Fetching...
      12:04:51 PM OnNotification 0
      12:04:52 PM OnNotification 1
      12:04:52 PM OnNotification 2
      12:04:52 PM OnNotification 3
      12:04:52 PM OnNotification 4
      12:04:52 PM OnNotification 5
      12:04:52 PM OnNotification 6
      12:04:52 PM OnNotification 7
      12:04:52 PM OnNotification 8
      12:04:52 PM OnNotification 9
      12:04:52 PM Fetching...
      12:04:53 PM OnNotification 10
      12:04:53 PM OnNotification 11
      12:04:53 PM OnNotification 12
      12:04:53 PM OnNotification 13
      12:04:53 PM OnNotification 14
      12:04:53 PM Fetching...
      12:04:55 PM OnNotification 0
      12:04:55 PM Fetching...
      12:04:55 PM OnNotification 1
      12:04:55 PM OnNotification 2
      12:04:55 PM OnNotification 3
      12:04:56 PM OnNotification 4
      12:04:56 PM OnNotification 5
      12:04:56 PM OnNotification 6
      12:04:56 PM OnNotification 7
      12:04:56 PM OnNotification 8
      12:04:56 PM OnNotification 9
      12:04:56 PM Fetching...
      12:04:56 PM OnNotification 10
      12:04:56 PM OnNotification 11
      12:04:56 PM OnNotification 12
      12:04:57 PM OnNotification 13
      12:04:57 PM OnNotification 14
      12:04:57 PM Fetching...
      

      【讨论】:

      • 我们正在 Rx 中寻找解决方案,因为我们正在使用其他 Rx 转换处理输入和输出上的 IObservables。
      猜你喜欢
      • 1970-01-01
      • 2016-04-28
      • 1970-01-01
      • 2020-04-25
      • 1970-01-01
      • 1970-01-01
      • 2021-09-16
      • 2019-02-13
      • 2018-07-16
      相关资源
      最近更新 更多