【问题标题】:Using Reactive Extensions to stream model changes使用 Reactive Extensions 流式传输模型更改
【发布时间】:2014-07-31 12:48:53
【问题描述】:

我正在开发一个服务器组件,该组件负责在内存中缓存模型,然后将任何更改流式传输到感兴趣的客户端。

当第一个客户端请求一个模型(模型密钥,每个模型都有一个密钥来识别它)时,模型将被创建(以及对下游系统的任何订阅),然后发送给客户端,然后是一个流更新(由下游系统生成)。任何后续客户端都应该再次使用更新流获取此缓存(更新)模型。当最后一个客户端取消订阅模型时,下游订阅应该被销毁并且缓存的模型被销毁。

关于 Rx 如何在这里提供帮助,谁能指出我正确的方向。我想目前我不清楚的是我如何同步(对象的)状态和变化流?我会有两个单独的 IObservable 用于模型和更新吗?

更新:这是我目前所拥有的:

        Model model = null;

        return Observable.Create((IObserver<ModelUpdate> observer) =>
        {
            model = _modelFactory.GetModel(key);
            _backendThing.Subscribe(model, observer.OnNext);
            return Disposable.Create(() =>
            {
                _backendThing.Unsubscribe(model);
            });
        })
            .Do((u) => model.MergeUpdate(u))
            .Buffer(_bufferLength)
            .Select(inp => new ModelEvent(inp))
            .Publish()
            .RefCount()
            .StartWith(new ModelEvent(model)

【问题讨论】:

  • 如果有一个模型对象并且对象的生命周期很长,那么我建议每个模型都封装一个可以流式传输更改的 Observable。
  • 那么会有很多模型对象。理想情况下,我想要一种可以订阅所有模型更新或特定模型更新的方式
  • @nzyme 在您最新的代码示例中,不要使用StartWith,只需调用observer.OnNext(new ModelUpdate(model)) 之前订阅_backendThing
  • @nzyme 另外,关闭Observable.Create 中的变量是一种代码异味。在您的特定情况下,这在技术上没有问题,因为您使用的是Publish().RefCount(),但如果可能的话,我可能会避免它。例如,ModelUpdate 应该公开一个 Model 属性,或者如果你不能这样做,那么定义一个结构,如:struct ModelNotification { ModelUpdate Update { get; } Model Model { get; } }

标签: system.reactive reactive-programming


【解决方案1】:

如果我正确理解了问题,就会有Models 动态进入。在应用程序生命周期的任何时间点,模型的数量都是未知的。

为此,IObservable&lt;IEnumerable&lt;Model&gt;&gt; 看起来像是一条路。每次添加新模型或删除现有模型时,都会流式传输更新的 IEnumerable&lt;Model&gt;。现在它基本上会保留较旧的对象,而不是在每次有更新时创建所有模型,除非有充分的理由这样做。

至于每个Model 对象状态的更新,例如任何字段值或属性值更改,我会查看Paul Betts 的ReactiveUI 项目,它有一个名为ReactiveObject 的东西。 Reactive 对象可帮助您轻松获取更改通知,但该库主要是为 WPF MVVM 应用程序设计的。

这是模型的状态更新如何与 ReactiveObject 一起进行的

public class Model : ReactiveObject
{
  int _currentPressure;
  public int CurrentPressure
 {
   get { return _currentPressure; }
   set { this.RaiseAndSetIfChagned(ref _currentPressure, value); }
 }
}

现在,只要您在应用程序中有 Model 对象,您就可以轻松获得一个 Observable,它将为您提供有关对象压力组件的更新。我可以使用WhenWhenAny 扩展方法。

但是,无论何时发生状态更改,您都不能使用 ReactiveUI 并拥有一个简单的 IObservable。

【讨论】:

    【解决方案2】:

    这样的事情可能会奏效,尽管您的要求对我来说并不完全清楚。

    private static readonly ConcurrentDictionary<Key, IObservable<Model>> cache = new...
    
    ...
    
    public IObservable<Model> GetModel(Key key)
    {
       return cache.GetOrAdd(key, CreateModelWithUpdates);
    }
    
    private IObservable<Model> CreateModelWithUpdates(Key key)
    {
       return Observable.Using(() => new Model(key), model => GetUpdates(model).StartWith(model))
                        .Publish((Model)null)
                        .RefCount()
                        .Where(model => model != null);
    }
    
    private IObservable<Model> GetUpdates(Model model) { ... }
    
    ...
    
    public class Model : IDisposable
    {
      ...
    }
    

    【讨论】:

    • 谢谢戴夫,这真的很有帮助。我认为这真的很接近我想要的,但我需要发布后的 StartWith,以便每个订阅者都能获得初始模型。我已经用我的最新代码更新了我的问题。不幸的是,StartWith 似乎发生在它上面的模型初始化之前。
    • @nzyme 是的,我的查询没有生成初始值(虽然这是偶然的——我只是没有测试它)。除非您需要我,否则我不会更正它,因为我对您的新代码有一个简单的修复,我将作为评论发布。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-09-21
    • 1970-01-01
    • 1970-01-01
    • 2020-03-29
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多