【发布时间】:2014-07-31 12:48:53
【问题描述】:
我正在开发一个服务器组件,该组件负责在内存中缓存模型,然后将任何更改流式传输到感兴趣的客户端。
当第一个客户端请求一个模型(模型密钥,每个模型都有一个密钥来识别它)时,模型将被创建(以及对下游系统的任何订阅),然后发送给客户端,然后是一个流更新(由下游系统生成)。任何后续客户端都应该再次使用更新流获取此缓存(更新)模型。当最后一个客户端取消订阅模型时,下游订阅应该被销毁并且缓存的模型被销毁。
关于 Rx 如何在这里提供帮助,谁能指出我正确的方向。我想目前我不清楚的是我如何同步(对象的)状态和变化流?我会有两个单独的 IObservable 用于模型和更新吗?
更新:这是我目前所拥有的:
Model model = null;
return Observable.Create((IObserver<ModelUpdate> observer) =>
{
model = _modelFactory.GetModel(key);
_backendThing.Subscribe(model, observer.OnNext);
return Disposable.Create(() =>
{
_backendThing.Unsubscribe(model);
});
})
.Do((u) => model.MergeUpdate(u))
.Buffer(_bufferLength)
.Select(inp => new ModelEvent(inp))
.Publish()
.RefCount()
.StartWith(new ModelEvent(model)
【问题讨论】:
-
如果有一个模型对象并且对象的生命周期很长,那么我建议每个模型都封装一个可以流式传输更改的 Observable。
-
那么会有很多模型对象。理想情况下,我想要一种可以订阅所有模型更新或特定模型更新的方式
-
@nzyme 在您最新的代码示例中,不要使用
StartWith,只需调用observer.OnNext(new ModelUpdate(model))之前订阅_backendThing。 -
@nzyme 另外,关闭
Observable.Create中的变量是一种代码异味。在您的特定情况下,这在技术上没有问题,因为您使用的是Publish().RefCount(),但如果可能的话,我可能会避免它。例如,ModelUpdate 应该公开一个 Model 属性,或者如果你不能这样做,那么定义一个结构,如:struct ModelNotification { ModelUpdate Update { get; } Model Model { get; } }
标签: system.reactive reactive-programming