【问题标题】:Reactive services. Grouping and caching streams反应式服务。分组和缓存流
【发布时间】:2019-12-27 12:53:44
【问题描述】:

新:带有测试的整个源代码现在位于https://github.com/bboyle1234/ReactiveTest

假设我们有一个视图状态对象,它可以通过小的部分视图更改事件进行更新。以下是总视图、增量视图更新事件和构建总视图的累加器函数Update 的一些示例模型:

interface IDeviceView : ICloneable {
    Guid DeviceId { get; }
}

class DeviceTotalView : IDeviceView {
    public Guid DeviceId { get; set; }
    public int Voltage { get; set; }
    public int Currents { get; set; }
    public object Clone() => this.MemberwiseClone();
}

class DeviceVoltagesUpdateView : IDeviceView {
    public Guid DeviceId { get; set; }
    public int Voltage { get; set; }
    public object Clone() => this.MemberwiseClone();
}

class DeviceCurrentsUpdateView : IDeviceView {
    public Guid DeviceId { get; set; }
    public int Current { get; set; }
    public object Clone() => this.MemberwiseClone();
}

class DeviceUpdateEvent {
    public DeviceTotalView View;
    public IDeviceView LastUpdate;
}

static DeviceUpdateEvent Update(DeviceUpdateEvent previousUpdate, IDeviceView update) {
    if (update.DeviceId != previousUpdate.View.DeviceId) throw new InvalidOperationException("Device ids do not match (numskull exception).");
    var view = (DeviceTotalView)previousUpdate.View.Clone();
    switch (update) {
        case DeviceVoltagesUpdateView x: {
            view.Voltage = x.Voltage;
            break;
        }
        case DeviceCurrentsUpdateView x: {
            view.Currents = x.Current;
            break;
        }
    }
    return new DeviceUpdateEvent { View = view, LastUpdate = update };
}

接下来,假设我们已经有一个可注入服务,它能够为所有设备生成可观察到的小更新事件流,并且我们想要创建一个可以为单个设备生成聚合视图流的服务。

这是我们要创建的服务的接口:

interface IDeviceService {
    /// <summary>
    /// Gets an observable that produces aggregated update events for the device with the given deviceId.
    /// On subscription, the most recent event is immediately pushed to the subscriber.
    /// There can be multiple subscribers.
    /// </summary>
    IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId);
}

如何使用System.Reactive v4 库中的响应式扩展来实现此接口及其要求,以.netstandard2.0 为目标?这是我与 cmets 的锅炉代码,这是我所能得到的。

class DeviceService : IDeviceService {

    readonly IObservable<IDeviceView> Source;

    public DeviceService(IObservable<IDeviceView> source) { // injected parameter
        /// When injected here, "source" is cold in the sense that it won't produce events until the first time it is subscribed.
        /// "source" will throw an exception if its "Subscribe" method is called more than once as it is intended to have only one observer and 
        /// be read all the way from the beginning.
        Source = source;

        /// Callers of the "Subscribe" method below will expect data to be preloaded and will expect to be immediately delivered the most
        /// recent event. So we need to immediately subscribe to "source" and start preloading the aggregate streams.

        /// I'm assuming there is going to need to be a groupby to split the stream by device id.
        var groups = source.GroupBy(x => x.DeviceId);
        /// Now somehow we need to perform the aggregrate function on each grouping.
        /// And create an observable that immediately delivers the most recent aggregated event when "Subscribe" is called below.
    }

    public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId) {
        /// How do we implement this? The observable that we return must be pre-loaded with the latest update
        throw new NotImplementedException();
    }
}

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    你在那个要点中有一些奇怪的代码。这是我的工作:

    public class DeviceService : IDeviceService, IDisposable
    {
    
        readonly IObservable<IDeviceView> Source;
        private readonly Dictionary<Guid, IObservable<DeviceUpdateEvent>> _updateStreams = new Dictionary<Guid, IObservable<DeviceUpdateEvent>>();
        private readonly IObservable<(Guid, IObservable<DeviceUpdateEvent>)> _groupedStream;
        private readonly CompositeDisposable _disposable = new CompositeDisposable();
    
        public DeviceService(IObservable<IDeviceView> source)
        {
            Source = source;
    
            _groupedStream = source
                .GroupBy(v => v.DeviceId)
                .Select(o => (o.Key, o
                    .Scan(new DeviceUpdateEvent { View = DeviceTotalView.GetInitialView(o.Key), LastUpdate = null }, (lastTotalView, newView) => lastTotalView.Update(newView))
                    .Replay(1)
                    .RefCount()
                ));
    
            var groupSubscription = _groupedStream.Subscribe(t =>
            {
                _updateStreams[t.Item1] = t.Item2;
                _disposable.Add(t.Item2.Subscribe());
            });
            _disposable.Add(groupSubscription);
        }
    
        public void Dispose()
        {
            _disposable.Dispose();
        }
    
        public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId)
        {
            /// How do we implement this? The observable that we return must be pre-loaded with the latest update
            if(this._updateStreams.ContainsKey(deviceId))
                return this._updateStreams[deviceId];
            return _groupedStream
                .Where(t => t.Item1 == deviceId)
                .Select(t => t.Item2)
                .Switch();
    
    
        }
    }
    

    这里的肉是_groupedStream 块。正如您所说,您按 DeviceId 分组,然后使用 Scan 更新状态。我还将Update 移动到一个静态类并使其成为扩展方法。你需要一个初始状态,所以我修改了你的DeviceTotalView 类来获得它。相应修改:

    public class DeviceTotalView : IDeviceView
    {
        public Guid DeviceId { get; set; }
        public int Voltage { get; set; }
        public int Currents { get; set; }
        public object Clone() => this.MemberwiseClone();
        public static DeviceTotalView GetInitialView(Guid deviceId)
        {
            return new DeviceTotalView
            {
                DeviceId = deviceId,
                Voltage = 0,
                Currents = 0
            };
        }
    }
    

    接下来,.Replay(1).Refcount() 用于记住最近的更新,然后在订阅时提供。然后,我们将所有这些子 observables 填充到字典中,以便在方法调用时轻松检索。虚拟订阅 (_disposable.Add(t.Item2.Subscribe())) 是 Replay 工作所必需的。

    如果有一个尚未更新的 DeviceId 的早期请求,我们订阅 _groupedStream,它将等待第一次更新,生成该 Id 的可观察的,然后 .Switch 订阅它孩子可观察。

    但是,所有这些都对您的测试代码失败了,我猜是因为 ConnectableObservableForAsyncProducerConsumerQueue 类。我不想调试它,因为我不建议这样做。一般来说,不建议混合 TPL 和 Rx 代码。他们解决的问题在很大程度上是重叠的,而且会相互影响。所以我修改了你的测试代码,用重播主题替换了可连接的可观察队列。

    我还为早期请求添加了测试用例(在该设备的更新到达之前):

    DeviceUpdateEvent deviceView1 = null;
    DeviceUpdateEvent deviceView2 = null;
    DeviceUpdateEvent deviceView3 = null;
    
    var subject = new ReplaySubject<IDeviceView>();
    
    var id1 = Guid.NewGuid();
    var id2 = Guid.NewGuid();
    var id3 = Guid.NewGuid();
    
    subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 1 });
    subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 2 });
    subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 100 });
    
    var service = new DeviceService(subject);
    
    service.GetDeviceStream(id1).Subscribe(x => deviceView1 = x);
    service.GetDeviceStream(id2).Subscribe(x => deviceView2 = x);
    service.GetDeviceStream(id3).Subscribe(x => deviceView3 = x);
    
    /// I believe there is no need to pause here because the Subscribe method calls above 
    /// block until the events have all been pushed into the subscribers above.
    
    Assert.AreEqual(deviceView1.View.DeviceId, id1);
    Assert.AreEqual(deviceView2.View.DeviceId, id2);
    Assert.AreEqual(deviceView1.View.Voltage, 2);
    Assert.AreEqual(deviceView2.View.Voltage, 100);
    Assert.IsNull(deviceView3);
    
    subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 101 });
    Assert.AreEqual(deviceView2.View.Voltage, 101);
    
    subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id3, Voltage = 101 });
    Assert.AreEqual(deviceView3.View.DeviceId, id3);
    Assert.AreEqual(deviceView3.View.Voltage, 101);
    

    这很好,可以在没有异步的情况下运行。

    另外,作为一般提示,我建议使用 Microsoft.Reactive.Testing 包对 Rx 代码进行单元测试,而不是时间间隔的事情。

    【讨论】:

    • 做得很好 :) 有时调用者可能会在该特定 deviceId 的任何事件到达之前调用GetDeviceStream,在这种情况下它会抛出一个字典键缺失异常。我们如何返回一个还没有任何事件的可观察对象?
    • 整晚坐下来按 F5 等待您的回答是完全值得的。非常感谢。
    • 为什么要把RefCount()放在Replay(1)的末尾?
    • 更新了答案以支持早期请求。
    • Replay(1) 返回一个 IConnectableObservable,这是一种奇怪的动物。 RefCount 将可连接的 observable 转换为常规的 observable,这通常是您想要的。欲了解更多信息,请阅读此处:introtorx.com/Content/v1.0.10621.0/…
    【解决方案2】:

    非常感谢@Shlomo 的上述回答。

    接受的答案中给出的实现,虽然对我来说是一种神奇的教育,但有几个问题也需要依次解决。第一个是线程竞争问题,第二个是系统中有大量设备时的性能问题。我最终解决了线程竞赛并通过这个修改后的实现显着提高了性能:

    在构造函数中,分组和扫描的设备流直接订阅到BehaviorSubject,它实现了立即通知新订阅者流中的最新值所需的Replay(1).RefCount() 功能。

    GetDeviceStream 方法中,我们继续使用字典查找来查找设备流,如果字典中尚不存在BehaviorSubject,则创建一个预加载的BehaviorSubject。我们已经删除了上述问题中先前实现中存在的Where 搜索。使用 where 搜索导致了线程竞赛问题,该问题通过使分组流可重播得到解决。这导致了指数性能问题。将其替换为FirstOrDefault 将时间减少了一半,然后将其完全删除以支持GetCreate 字典技术,得到完美的性能 O(1) 而不是 O(n2)。

    GetCreateSubject 使用Lazy 代理对象作为字典值,因为ConcurrentDictionary 有时可以为单个键多次调用Create 方法。向字典提供 Lazy 可确保仅在其中一个惰性设备上调用 Value 属性,因此每个设备仅创建一个 BehaviorSubject

    class DeviceService : IDeviceService, IDisposable {
    
        readonly CompositeDisposable _disposable = new CompositeDisposable();
        readonly ConcurrentDictionary<Guid, Lazy<BehaviorSubject<DeviceUpdateEvent>>> _streams = new ConcurrentDictionary<Guid, Lazy<BehaviorSubject<DeviceUpdateEvent>>>();
        BehaviorSubject<DeviceUpdateEvent> GetCreateSubject(Guid deviceId) {
            return _streams.GetOrAdd(deviceId, Create).Value;
            Lazy<BehaviorSubject<DeviceUpdateEvent>> Create(Guid id) {
                return new Lazy<BehaviorSubject<DeviceUpdateEvent>>(() => {
                    var subject = new BehaviorSubject<DeviceUpdateEvent>(DeviceUpdateEvent.GetInitialView(deviceId));
                    _disposable.Add(subject);
                    return subject;
                });
            }
        }
    
        public DeviceService(IConnectableObservable<IDeviceView> source) {
            _disposable.Add(source
                .GroupBy(x => x.DeviceId)
                .Subscribe(deviceStream => {
                    _disposable.Add(deviceStream
                        .Scan(DeviceUpdateEvent.GetInitialView(deviceStream.Key), DeviceUtils.Update)
                        .Subscribe(GetCreateSubject(deviceStream.Key)));
                }));
            _disposable.Add(source.Connect());
        }
    
        public void Dispose() {
            _disposable.Dispose();
        }
    
        public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId) {
            return GetCreateSubject(deviceId).AsObservable();
        }
    }
    
    
    [TestMethod]
    public async Task Test2() {
        var input = new AsyncProducerConsumerQueue<IDeviceView>();
        var source = new ConnectableObservableForAsyncProducerConsumerQueue<IDeviceView>(input);
        var service = new DeviceService(source);
    
        var ids = Enumerable.Range(0, 100000).Select(i => Guid.NewGuid()).ToArray();
        var idsRemaining = ids.ToHashSet();
        var t1 = Task.Run(async () => {
            foreach (var id in ids) {
                await input.EnqueueAsync(new DeviceVoltagesUpdateView { DeviceId = id, Voltage = 1 });
            }
        });
        var t2 = Task.Run(() => {
            foreach (var id in ids) {
                service.GetDeviceStream(id).Subscribe(x => idsRemaining.Remove(x.View.DeviceId));
            }
        });
        await Task.WhenAll(t1, t2);
        var sw = Stopwatch.StartNew();
        while (idsRemaining.Count > 0) {
            if (sw.Elapsed.TotalSeconds > 600) throw new Exception("Failed");
            await Task.Delay(100);
        }
    }
    

    在此处查看完整的问题源代码和测试代码:https://github.com/bboyle1234/ReactiveTest

    【讨论】:

      猜你喜欢
      • 2017-07-05
      • 2019-07-20
      • 1970-01-01
      • 2017-09-13
      • 1970-01-01
      • 2021-12-05
      • 1970-01-01
      • 2011-03-19
      • 1970-01-01
      相关资源
      最近更新 更多