【问题标题】:Testing Rx & TcpClient测试 Rx 和 TcpClient
【发布时间】:2013-11-28 20:50:55
【问题描述】:

经过相当多的思考和在互联网上翻找,我想出了(我认为是)将 TcpIp 转换为 Observable 数据流的非常漂亮的封装:

public static IObservable<string> CreateFromTcpIp(IPAddress ip, int port)
{
    return 
        Observable.Using(
            () => new TcpClient(),                               //create client
            client => client.ConnectObservable(ip, port)         //connect async
                      .Select(_ => client.GetStream())            // get stream
                      .Select(stream => new BinaryReader(stream)) // get reader 
                      .SelectMany(reader => reader.ToObservable()) // see below
                      .Retry()                                // if error, retry 
            )
            .Publish().RefCount(); //only subscribe once for all subscribers
}

其中ConnectObservable 就是client.ConnectAsync(host, port).ToObservable(),而reader.ToObservable 看起来像这样:

private static IObservable<string> ToObservable(this BinaryReader source)
{
   return Observable.Create<string>(
       observer =>
       {
           return Observable.Generate(
               source,
               reader => true, //just keep on going
               reader => reader,
               reader => reader.ReadString())
           .Subscribe(observer);
       });
}

我面临的问题是如何进行测试。我想在调用堆栈中一直实现/包装接口。

我一直在尝试在我的测试类中使用 TcpListener 创建一个“服务器”,但是遇到了各种(可能不相关的)问题,让订阅和服务共存(订阅我的 CreateFromTcpIp 的结果似乎永远不会返回 IDisposable,在我的测试中插入延迟似乎最终会锁定程序集,因此我无法重建)。

是否有我遗漏的替代方法?

EDIT 订阅不返回的问题与对有故障的 Observable 进行重试有关。我问了另一个问题here

【问题讨论】:

  • 您是否考虑过使用 Subject(或 ISubject)而不是 IObservable?它非常适合单元测试。
  • 首先,取出Retry,直到完成单元测试。其次,将单元测试代码发布在您尝试设置TcpListener 的位置以及您尝试针对它测试您的方法的位置。
  • 所以......可能只是我,但这闻起来不香吗?我的意思是,这里似乎有很多假设......就像,如果流错误,我们应该永远重试。
  • 如果它也没有出错...... BinaryReader ToObservable 如果流完成会抛出错误,它将尝试读取流的末尾。
  • @JamesWorld,我想我已经找到了。在 Tcp 没有响应的情况下,是 ConnectAsync(或背后的 ToObservable)没有“告诉我”错误(它只在我 unsubscribe 时告诉我 - 没有帮助)。在它响应的情况下,我需要做一个SubscribeOn 来释放主线程,以便它可以将订阅返回给我。感谢您的帮助!

标签: c# system.reactive tcpclient


【解决方案1】:

我想你可以像这样测试BinaryReaderToObservable(如果你让它可以访问的话)。尽管我编写它是为了传递所写的代码(使用 nunit),但它期望异常的事实可能会突出一些问题:

[Test]
public void TestStreamToObservable()
{

    var expectedText = new List<string>
    {
        "A good test is simple.",
        "A rolling stone gathers no moss.",
        "Test properly"
    };

    var stream = new MemoryStream();
    var writer = new BinaryWriter(stream);
    expectedText.ForEach(writer.Write);
    writer.Flush();

    stream.Seek(0, SeekOrigin.Begin);
    var reader = new BinaryReader(stream);

    var resultText = new List<string>();

    Assert.Throws<EndOfStreamException>(
        () => reader.ToObservable().Subscribe(resultText.Add));

    CollectionAssert.AreEquivalent(expectedText, resultText);
}

至于其余部分,您通过创建一个new TcpClient(),除了集成测试之外,您已经把自己搞砸了。 new 的任何具体的东西总是会给测试带来麻烦。您可以重构并将Retry().Publish().RefCount() 拉出到一个单独的方法中,该方法在IObservable&lt;string&gt; 上运行。然后将TcpClient 位仅留给集成测试。

【讨论】:

  • 我知道你说的新建一个 TcpClient 是什么意思,但是如果我把它传入,我不应该也传入(隐式)新的 Stream 和新的 BinaryReader 吗?在这一点上,没有太多要测试的了(也许这就是重点?)。但这是我想测试我的代码的级联连接/流错误......
  • 奇怪,我以前有 reader.BaseStream.Position != reader.BaseStream.Length 而不是简单的 true 作为完成条件,它扔给我,所以我把 true 放进去。我试过把它放回去,而且是孤立的(正如你对这个测试的建议)它似乎没问题。谢谢你。
  • 小心,它不适用于不支持长度的底层流;真正的流不会。
  • 事实上,这巧妙地突出了测试套接字的难度。 Michael Perry 在pluralsight.com 上有一门很棒的课程,叫做“Provable Code”;在“模式”一章中,他重新构建了 Socket API - 这是对构建方式的所有错误的高度指导。 TcpClient 更好,但仍然存在问题。我在想你可能最好构建一个接受工厂函数的方法,该函数返回具有单个 BinaryReader 属性的一次性对象。然后你可以模拟所有的错误点,并封装 TcpClient goo。
  • 谢谢。正如你所知道的,在这个特定的领域(网络化的东西),我正在尽我所能地犯错。不过我打算试试工厂的想法。
【解决方案2】:

好的,为了后代,这是我在解决这个问题时学到的一些东西。 主要是关于使用 Rx 的微妙之处:

  • 如果你没有观察到OnError,它会抛出异常。
  • 除此之外,如果您在 Task 上执行了 ToObservable 并且您没有观察到 OnError,它只会在您取消订阅时抛出
  • 此外,在任务下游执行Retry 毫无意义:一旦引发异常,即为Result,它不会再尝试(我花了一段时间才解决这个问题!)。
  • 如果我的订阅永远不会返回,我需要 SubscribeOn 另一个线程 (Scheduler.Default)。
  • James 的回答中暗示了另一个错误:BinaryReader 上的 ToObservable 引发异常。这里重要的是它是Exception,而不是OnError。所以不可能有Retry

(小代码sn-p向我们自己证明这一点):

Observable.Generate(0, i => i < 4, i => i + 1, 
                    i => { if(i > 2) throw new Exception("sorry"); return i; })
          .Subscribe(i => Console.WriteLine(i), 
                     ex => Console.WriteLine(ex.Message));

鉴于此,我重新编码了我的ToObservable 以使用它(不太漂亮):

return Observable.Create<string>(obs =>
{
    var subscribed = true;
    var errored = false;
    string result = String.Empty;
    while(subscribed && !errored)
    {
        try
        {
            result = source.ReadString();
        }
        catch(Exception ex)
        {
            errored = true;
            obs.OnError(ex);
        }
        obs.OnNext(result); 
    }
    return Disposable.Create(() => subscribed = false);
});
  • 为了进一步发展,我想要一个不那么激进的Retry。这让我找到了this excellent answer。 (似乎不管我在 Rx 中写了什么,总有一种更优雅的方式来做到这一点)
  • James 的 Spy 扩展方法非常很有帮助。强烈推荐。

【讨论】:

  • 很好的验尸报告。我从没想过Retry 不处理任务——事实上,尝试重新订阅的 any 操作员显然会失败——而且这种行为没有在IObservable&lt;T&gt; 中公开,需要你小心这些 observables 是如何传递的。虽然这不是 Rx 中的一个独特问题,但我确实看到了很多设计为订阅一次的 observable 示例。
  • Observable.FromAsync 是您在每个订阅上重新启动任务所需的内容。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-06-22
  • 1970-01-01
  • 1970-01-01
  • 2017-04-18
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多