【问题标题】:RX: testing Sample\Interval\Switch pipelineRX:测试 Sample\Interval\Switch 管道
【发布时间】:2016-09-07 08:08:49
【问题描述】:

我有一些 RX 代码来测试事件流上的不活动状态,流上的活动会重置一个间隔,即不活动触发器。

public interface IReportActivity
{
    event EventHandler Activity;
}

public interface IInactivityMonitor
{
    IObservable<Unit> ObserveInactivity(TimeSpan inactivityTimeout);
}

public class InactivityMonitor : IInactivityMonitor
{
    private readonly ISchedulerProvider _schedulerProvider;
    private readonly IReportActivity _activitySource;
    private IObservable<Unit> _inactivityObservable;

    public InactivityMonitor(IRaiseActivity activitySource, ISchedulerProvider schedulerProvider)
    {
        _activitySource = activitySource;
        _schedulerProvider = schedulerProvider;
    }

    public IObservable<Unit> ObserveInactivity(TimeSpan inactivityTimeout)
    {
        return GetInactivityObservable()
            .Select(_ => Observable.Interval(inactivityTimeout, _schedulerProvider.NewThread)
                    .Timestamp()
                    .Select(__ => Unit.Default))
            .Switch();
    }

    public IObservable<Unit> GetInactivityObservable()
    {
        return _inactivityObservable = _inactivityObservable ??
            Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
                h => _activitySource.Activity += h,
                h => _activitySource.Activity -= h)
                .Sample(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread)
                .Select(_ => Unit.Default)
                .Publish()
                .RefCount();
    }
}

代码按要求工作(尽管我认为每秒重新创建间隔已经结束 - ObserveInactivity 应该在超时之前采样并根据最后一个活动的时间戳重置)

我遇到的真正问题是尝试测试这段代码。

[TestFixture]
public class InactivityMonitorTests
{
    private TestSchedulers _testSchedulers;
    private InactivityMonitor _sut;
    private AutoMock _moqqer;

    protected override void Setup()
    {
        base.Setup();

        _moqqer = new AutoMock()
        _testSchedulers = new TestSchedulers();
        _moqqer.Use<ISchedulerProvider>(_testSchedulers);
        _sut = Moqqer.CreateInstance<InactivityMonitor>();
    }

    // this test passes
    [Test]
    public void GetInactivityObservable_ActivityDetected_ReportsActivity()
    {
        var activityObserved = false;

        _sut.GetInactivityObservable()
           .Subscribe(x => activityObserved = true);

        RaiseActivityEvent();

        _testSchedulers.NewThread.AdvanceBy(TimeSpan.FromSeconds(11).Ticks);

        activityObserved.Should().BeTrue();
    }

    private void RaiseActivityEvent()
    {
        _moqqer.GetMock<IReportActivty>()
          .Raise(m => m.Activity += null, EventArgs.Empty);
    }

    // this test fails, The interval never appears to get set up via the tests.
    [Test]
    public void ObserveActivity_InactivtyTimeoutExceeded_NotificationReceived()
    {
       var inactivityObserved = false;

        _sut.ObserveInactivity(TimeSpan.FromSeconds(10))
           .Subscribe(x => inactivityObserved = true);

       _testSchedulers.NewThread.AdvanceBy(TimeSpan.FromSeconds(11).Ticks);

       inactivityObserved.Should().BeTrue();
    }
}

public interface ISchedulerProvider
{
    IScheduler CurrentThread { get; }
    IScheduler Dispatcher { get; }
    IScheduler Immediate { get; }
    IScheduler NewThread { get; }
    IScheduler ThreadPool { get; }

    IScheduler TaskPool { get; } 
}

public sealed class TestSchedulers : ISchedulerProvider
{
    private readonly TestScheduler _currentThread = new TestScheduler();
    private readonly TestScheduler _dispatcher = new TestScheduler();
    private readonly TestScheduler _immediate = new TestScheduler();
    private readonly TestScheduler _newThread = new TestScheduler();
    private readonly TestScheduler _threadPool = new TestScheduler();
    private readonly TestScheduler _taskPool = new TestScheduler();
    #region Explicit implementation of ISchedulerService
    IScheduler ISchedulerProvider.CurrentThread { get { return _currentThread; } }
    IScheduler ISchedulerProvider.Dispatcher { get { return _dispatcher; } }
    IScheduler ISchedulerProvider.Immediate { get { return _immediate; } }
    IScheduler ISchedulerProvider.NewThread { get { return _newThread; } }
    IScheduler ISchedulerProvider.ThreadPool { get { return _threadPool; } }
    IScheduler ISchedulerProvider.TaskPool { get { return _taskPool; } }

    #endregion
    public TestScheduler CurrentThread { get { return _currentThread; } }
    public TestScheduler Dispatcher { get { return _dispatcher; } }
    public TestScheduler Immediate { get { return _immediate; } }
    public TestScheduler NewThread { get { return _newThread; } }
    public TestScheduler ThreadPool { get { return _threadPool; } }
    public TestScheduler TaskPool { get { return _taskPool; } }
}

我尝试了各种方法来启动调度程序,在订阅之前和之后,尝试订阅另一个调度程序并在订阅之前启动它,但没有成功。

编辑:调试显示在 ObserveInactivity 方法中永远不会创建间隔。

希望有人能指出我正确的方向。

谢谢

【问题讨论】:

  • 我猜你的ISchedulerProviderTestSchedulers 来自introtorx.com
  • 是的,忘记在原帖中提及了。
  • 已经发布了调度器的代码

标签: c# .net system.reactive


【解决方案1】:

我会重新组织您的不活动查询,以便您不会从_activitySource.Activity 触发然后增加时间,而是希望在 XXX 期间出现故障/静默并为此发出通知。然后,如果有任何活动,则升级该查询以取消/重新启动。

即而不是

public IObservable<Unit> GetInactivityObservable()
{
    return _inactivityObservable = _inactivityObservable ??
        Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
            h => _activitySource.Activity += h,
            h => _activitySource.Activity -= h)
            .Sample(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread)
            .Select(_ => Unit.Default)
            .Publish()
            .RefCount();
}

把它改成

public IObservable<Unit> GetInactivityObservable()
{
    return _inactivityObservable = _inactivityObservable ??
        Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
            h => _activitySource.Activity += h,
            h => _activitySource.Activity -= h)
            .Select(_=>Unit.Default)
            .StartWith(Unit.Default)
            .Sample(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread)
            .Select(a=>Observable.Timer(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread).Select(_=>Unit.Default))
            .Switch()
            .Publish()
            .RefCount();
}

然后,由于 Rx 已经被延迟评估,我会将这个方法设为静态且无副作用,并在您的构造函数中调用它,或者只是内联它。

public class InactivityMonitor : IInactivityMonitor
{
    private readonly ISchedulerProvider _schedulerProvider;
    private readonly IObservable<Unit> _inactivityObservable;
    private static readonly TimeSpan SilencePeriod = TimeSpan.FromSeconds(1);

public InactivityMonitor(IRaiseActivity activitySource, ISchedulerProvider schedulerProvider)
{
    _schedulerProvider = schedulerProvider;
    _inactivityObservable = Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
            h => _activitySource.Activity += h,
            h => _activitySource.Activity -= h)
            .StartWith(null)
            .Select(a=>Observable.Timer(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread))
            .Switch()
            .Select(_=>Unit.Default)
            .Publish()
            .RefCount();
}

//...

}

【讨论】:

    【解决方案2】:

    所以,有机会回到这个问题,我看到了我的错误。

    Observable.Interval 永远不会启动,因为没有报告任何活动(由错误命名的 inactivity observable 报告)。在那里运行应用程序很可能至少引发了 1 个事件。为了解决这个问题,我添加了.StartWith(Unit.Default),如下所示,这允许每个观察者立即开始超时,这显然是原始实现中的一个潜在错误。

    public IObservable<Unit> ObserveInactivity(TimeSpan inactivityTimeout)
    {
        return GetInactivityObservable()
            .StartWith(Unit.Default)  // <--
            .Select(_ => Observable.Interval(inactivityTimeout, _schedulerProvider.NewThread)
                    .Timestamp()
                    .Select(__ => Unit.Default))
            .Switch();
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-03-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多