【问题标题】:How to detect when NetworkStream finishes on Rx query如何检测 NetworkStream 何时完成 Rx 查询
【发布时间】:2015-05-20 15:10:20
【问题描述】:

我正在使用 Rx 从 NetworkStream 中读取数据并将结果作为 Hot Observable 提供。 即使查询效果很好,我也不确定基于 NetworkStream 完成序列的条件是否最合适。我遇到了序列完成而另一端的 TcpListener 尚未完成或关闭连接的情况。

这里是查询。我将不胜感激得到一些关于安全终止序列的正确条件的建议:

private IDisposable GetStreamSubscription(TcpClient client)
{
    return Observable.Defer(() => {
      var buffer = new byte[client.ReceiveBufferSize];

      return Observable.FromAsync<int>(() => {
         return client.GetStream ().ReadAsync (buffer, 0, buffer.Length);
      })
      .SubscribeOn(NewThreadScheduler.Default)
      .Select(x => buffer.Take(x).ToArray());
    })
    .Repeat()
    .TakeWhile(bytes => bytes.Any()) //This is the condition to review
    .Subscribe(bytes => {
        //OnNext Logic
    }, ex => {
        //OnError logic
    }, () => {
        //OnCompleted Logic
    });
}

为了明确我的问题,我需要知道检测网络流何时在另一端完成的最佳方法(由于断开连接、错误或其他原因)。现在我通过调用 ReadAsync 来做到这一点,直到没有返回任何字节,但我不知道这是否完全安全。

【问题讨论】:

  • 这段代码甚至无法编译。你能发布编译的代码吗?
  • 我刚刚编辑了代码,删除了无用的部分并使其可编译
  • 根据 MSDN 文档:如果当前可用的字节数小于请求的数量,则结果值可以小于请求的字节数,或者如果结束,则可以为 0(零)已达到流的最大数。
  • 谢谢。是的,但我不知道这种情况是否只会发生,是否仅到达流的末尾,或者是否存在任何其他情况,即使流尚未完成,Read 也可能返回 0 字节。我在其他一些地方读到,没有明确的方法可以 100% 确定这一点。
  • 我认为您不应该依赖 ReadAsync 来查看您是否阅读了传输的所有内容。我宁愿建议您使用某种可以解析的协议,以查看您是否收到了所有必需的数据。

标签: c# .net system.reactive tcpclient networkstream


【解决方案1】:

这是你想要的吗?

private IDisposable GetStreamSubscription(TcpClient client)
{
    return Observable
        .Defer(() =>
        {
            var buffer = new byte[client.ReceiveBufferSize];
            return Observable.Using(
                () => client.GetStream(),
                st => Observable.While(
                    () => st.DataAvailable,
                    Observable.Start(() =>
                    {
                        var bytes = st.Read(buffer, 0, buffer.Length);
                        return buffer.Take(bytes).ToArray();
                    })));
        })
        .SubscribeOn(NewThreadScheduler.Default)
        .Subscribe(bytes => {
            //OnNext Logic
        }, ex => {
            //OnError logic
        }, () => {
            //OnCompleted Logic
        });
}

请注意,这会安全地处理流并在不再有任何可用数据时结束。

【讨论】:

    猜你喜欢
    • 2012-06-30
    • 1970-01-01
    • 2019-07-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-12-08
    相关资源
    最近更新 更多