【问题标题】:RX: How to concat a Snapshot stream and an Update stream?RX:如何连接快照流和更新流?
【发布时间】:2013-06-18 00:58:57
【问题描述】:

我一直在尝试创建一个 observable,它从存储库缓存中流式传输世界状态(快照),然后从单独的提要中进行实时更新。问题是快照调用是阻塞的,因此必须在此期间缓冲更新。

这是我想出的,稍微简化了一点。 GetStream() 方法是我关心的方法。我想知道是否有更优雅的解决方案。假设 GetDataFeed() 全天对缓存进行更新。

private static readonly IConnectableObservable<long> _updateStream;

public static Constructor()
{
      _updateStream = GetDataFeed().Publish();
      _updateStream.Connect();
}

static void Main(string[] args)
{
      _updateStream.Subscribe(Console.WriteLine);
      Console.ReadLine();
      GetStream().Subscribe(l => Console.WriteLine("Stream: " + l));
      Console.ReadLine();
}

public static IObservable<long> GetStream()
{
      return Observable.Create<long>(observer =>
            {
                  var bufferedStream = new ReplaySubject<long>();
                  _updateStream.Subscribe(bufferedStream);
                  var data = GetSnapshot();
                  // This returns the ticks from GetSnapshot
                  //  followed by the buffered ticks from _updateStream
                  //  followed by any subsequent ticks from _updateStream
                  data.ToObservable().Concat(bufferedStream).Subscribe(observer);

                  return Disposable.Empty;
            });
}

private static IObservable<long> GetDataFeed()
{
      var feed = Observable.Interval(TimeSpan.FromSeconds(1));
      return Observable.Create<long>(observer =>
      {
            feed.Subscribe(observer);
            return Disposable.Empty;
      });
}

大众意见反对主题,因为它们不是“功能性”的,但如果没有 ReplaySubject,我找不到这样做的方法。热可观察对象上的重播过滤器不起作用,因为它会重播所有内容(可能是一整天的陈旧更新)。

我也担心比赛条件。有没有办法保证某种排序,是否应该在快照之前缓冲较早的更新?与其他 RX 操作员一起,整个事情可以更安全、更优雅地完成吗?

谢谢。

-会

【问题讨论】:

    标签: c# repository-pattern system.reactive


    【解决方案1】:

    无论您使用ReplaySubject 还是Replay 函数真的没有区别。 Replay 在引擎盖下使用 ReplaySubject。我还会注意到您正在疯狂地泄漏订阅,这可能会导致资源泄漏。此外,您对回放缓冲区的大小没有任何限制。如果您整天观看 observable,那么回放缓冲区将不断增长。你应该限制它以防止这种情况发生。

    这是GetStream 的更新版本。在这个版本中,我采用了将Replay 限制为最近 1 分钟数据的简单方法。这假设GetData 将始终完成并且观察者将在那一分钟内观察结果。您的里程可能会有所不同,您可能会改进此方案。但至少这样,当您整天观看 observable 时,该缓冲区不会无限增长,并且仍然只会包含一分钟的更新。

    public static IObservable<long> GetStream()
    {
        return Observable.Create<long>(observer =>
        {
            var updateStreamSubscription = new SingleAssignmentDisposable();
            var sequenceDisposable = new SingleAssignmentDisposable();
            var subscriptions = new CompositeDisposable(updateStreamDisposable, sequenceDisposable);
    
            // start buffering the updates
            var bufferedStream = _updateStream.Replay(TimeSpan.FromMinutes(1));
            updateStreamSubscription.Disposable = bufferedStream.Connect();
    
            // now retrieve the initial snapshot data
            var data = GetSnapshot();
    
            // subscribe to the snapshot followed by the buffered data
            sequenceDisposable.Disposable = data.ToObservable().Concat(bufferedStream).subscribe(observer);
    
            // return the composite disposable which will unsubscribe when the observer wishes
            return subscriptions;
        });
    }
    

    至于您关于竞争条件和过滤“旧”更新的问题...如果您的快照数据包含某种版本信息,并且您的更新流还提供版本信息,那么您可以有效地衡量由返回的最新版本您的快照查询,然后过滤缓冲流以忽略旧版本的值。这是一个粗略的例子:

    public static IObservable<long> GetStream()
    {
        return Observable.Create<long>(observer =>
        {
            var updateStreamSubscription = new SingleAssignmentDisposable();
            var sequenceDisposable = new SingleAssignmentDisposable();
            var subscriptions = new CompositeDisposable(updateStreamDisposable, sequenceDisposable);
    
            // start buffering the updates
            var bufferedStream = _updateStream.Replay(TimeSpan.FromMinutes(1));
            updateStreamSubscription.Disposable = bufferedStream.Connect();
    
            // now retrieve the initial snapshot data
            var data = GetSnapshot();
    
            var snapshotVersion = data.Length > 0 ? data[data.Length - 1].Version : 0;
            var filteredUpdates = bufferedStream.Where(update => update.Version > snapshotVersion);
    
            // subscribe to the snapshot followed by the buffered data
            sequenceDisposable.Disposable = data.ToObservable().Concat(filteredUpdates).subscribe(observer);
    
            // return the composite disposable which will unsubscribe when the observer wishes
            return subscriptions;
        });
    }
    

    在将实时更新与存储的快照合并时,我已成功使用此模式。我还没有找到一个优雅的 Rx 运算符,它已经在没有任何竞争条件的情况下做到了这一点。但是上面的方法可能会变成这样。 :)

    编辑:请注意,我在上面的示例中省略了错误处理。从理论上讲,对GetSnapshot 的调用可能会失败,并且您会将订阅泄露到更新流。我建议将 CompositeDisposable 声明之后的所有内容包装在 try/catch 块中,并在 catch 处理程序中,确保在重新抛出异常之前调用 subscriptions.Dispose()

    【讨论】:

    • 这正是我想要的。我没有意识到你可以在订阅的那一刻打电话给 Replay!当您说我正在泄漏订阅时,是否可以通过将流包装到 CompositeDisposable 中来解决?谢谢!
    • 通过泄漏订阅,我的意思是您调用了Subscribe,但忽略了返回的IDisposable,因此无法取消订阅该流。一般来说,当您使用Observable.Create 创建一个可观察对象时,您希望确保在您返回的一次性用品被处置时,您所做的任何订阅调用都被正确取消订阅。在这种情况下,我们有 2 次订阅调用(嗯,1 次订阅和 1 次连接),因此我们要确保两个一次性用品都已处置。 CompositeDisposables 非常适合这个目的。
    猜你喜欢
    • 2021-09-02
    • 1970-01-01
    • 1970-01-01
    • 2021-04-02
    • 2020-09-26
    • 2021-07-02
    • 2019-05-17
    • 1970-01-01
    • 2019-03-10
    相关资源
    最近更新 更多