【问题标题】:.NET Rx - ReplaySubject buffer size not working.NET Rx - ReplaySubject 缓冲区大小不起作用
【发布时间】:2012-01-31 03:43:17
【问题描述】:

我一直在使用 .NET Reactive Extensions 来观察日志事件的到来。我目前正在使用一个派生自 IObservable 的类并使用 ReplaySubject 来存储日志,这样我就可以过滤和重播日志(例如:显示所有错误日志,或显示所有详细日志)而不会丢失我缓冲的日志。

问题是,即使我已经为主题设置了缓冲区大小:

this.subject = new ReplaySubject<LogEvent>(10);

当我使用 OnNext 在无限循环中添加到 observable 集合时,我的程序的内存使用量达到了顶峰:

internal void WatchForNewEvents()
        {
            Task.Factory.StartNew(() =>
                {
                    while (true)
                    {
                        dynamic parameters = new ExpandoObject();
                        // TODO: Add parameters for getting specific log events

                        if (this.logEventRepository.GetManyHasNewResults(parameters))
                        {
                            foreach (var recentEvent in this.logEventRepository.GetMany(parameters))
                            {
                                this.subject.OnNext(recentEvent);
                            }
                        }

                        // Commented this out for now to really see the memory go up 
                        // Thread.Sleep(1000); 
                    }
                });
        }

ReplaySubject 上的缓冲区大小不起作用吗?达到缓冲区大小时,它似乎没有清除缓冲区。非常感谢任何帮助!

更新:

我这样添加订阅者(这是错的吗?):

public IDisposable Subscribe(IObserver<LogEvent> observer)
        {
            return this.subject.Subscribe(observer);
        }

...这就是所谓的:

// Inserts into UI ListView
    this.logEventObservable.Subscribe(evt => this.InsertNewLogEvent(evt));

【问题讨论】:

  • 如果将订阅者添加到 ReplaySubject 会发生什么?我认为它不应该像这样泄漏,但我仍然很好奇会发生什么
  • 感谢@Paul,我添加了有关如何添加订阅者的更多信息。
  • 我已经隔离了这个问题,虽然我还不知道解决方案。在循环中调用 OnNext 之前订阅主题可防止内存泄漏。订阅之后会导致它。

标签: system.reactive


【解决方案1】:

我不确定这是否是明确的答案,但我怀疑您遇到问题是因为您使用的调度程序的并发性。您在 ReplaySubject 上调用的构造函数如下所示:

public ReplaySubject(int bufferSize)
    : this(bufferSize, TimeSpan.MaxValue, Scheduler.CurrentThread)
{ }

Scheduler.CurrentThread 让我担心。尝试将其更改为 Scheduler.ThreadPool 看看是否有帮助。

另外,作为旁注,您似乎将 Rx 与 TPL 和老式线程睡眠混合在一起。通常最好避免这样做。您可以将 WatchForNewEvents 代码更改为如下所示:

dynamic parameters = new ExpandoObject();

var newEvents =
    from n in Observable.Interval(TimeSpan.FromSeconds(1.0))
    where this.logEventRepository.GetManyHasNewResults(parameters)
    from recentEvent in
        this.logEventRepository.GetMany(parameters).ToObservable()
    select recentEvent;

newEvents.Subscribe(this.subject);

这是一种很好的紧凑的 Rx-y 做事方式。

【讨论】:

  • 感谢您的建议,虽然更改调度程序没有效果。我想知道我的 IObservable 实现是否不正确?
  • @NickRamirez - 很抱歉我的预感没有成功。我建议不要尝试实现IObservable - 这通常看起来更复杂,而且很容易出错。请改用内置运算符。
  • 最后,按照您的建议,使用 Observable.Interval 而不是 TPL 修复了内存泄漏。谢谢!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-06-11
  • 2020-09-05
  • 1970-01-01
  • 2017-08-16
  • 1970-01-01
相关资源
最近更新 更多