【问题标题】:NetworkStream.ReadAsync with a cancellation token never cancels带有取消令牌的 NetworkStream.ReadAsync 永远不会取消
【发布时间】:2012-09-14 09:45:12
【问题描述】:

这里有证据。
知道这段代码有什么问题吗?

    [TestMethod]
    public void TestTest()
    {
        var tcp = new TcpClient() { ReceiveTimeout = 5000, SendTimeout = 20000 };
        tcp.Connect(IPAddress.Parse("176.31.100.115"), 25);
        bool ok = Read(tcp.GetStream()).Wait(30000);
        Assert.IsTrue(ok);
    }

    async Task Read(NetworkStream stream)
    {
        using (var cancellationTokenSource = new CancellationTokenSource(5000))
        {
            int receivedCount;
            try
            {
                var buffer = new byte[1000];
                receivedCount = await stream.ReadAsync(buffer, 0, 1000, cancellationTokenSource.Token);
            }
            catch (TimeoutException e)
            {
                receivedCount = -1;
            }
        }
    }

【问题讨论】:

  • 您能描述一下代码应该做什么以及它实际上为您做了什么吗?你是如何运行你的代码的?如果您直接从控制台应用程序运行它会有什么变化吗?
  • 重申一下:测试做什么
  • 我遇到了与HttpClient.GetStreamAsync(...).CopyToAsync(...) 相同的问题,它使用ReadOnlyStream 而不是WebExceptionWrapperStream 而不是ConnectStream,但仍然是base.ReadAsync(),它没有将cancellationToken 进一步传递到BeginEndReadAsync()

标签: .net sockets .net-4.5 async-await cancellation-token


【解决方案1】:

我终于找到了解决方法。使用 Task.WaitAny 将异步调用与延迟任务 (Task.Delay) 相结合。当延迟在 io 任务之前过去时,关闭流。这将强制任务停止。您应该正确处理 io 任务的异步异常。并且你应该为延迟任务和 io 任务添加一个延续任务。

它也适用于 tcp 连接。在另一个线程中关闭连接(您可以认为它是延迟任务线程)会强制所有使用/等待此连接停止的异步任务。

--编辑--

@vtortola 建议的另一个更清洁的解决方案:使用取消令牌注册对 stream.Close 的调用:

async ValueTask Read(NetworkStream stream, TimeSpan timeout = default)
{
    if(timeout == default(TimeSpan))
      timeout = TimeSpan.FromSeconds(5);

    using var cts = new CancellationTokenSource(timeout); //C# 8 syntax
    using(cts.Token.Register(() => stream.Close()))
    {
       int receivedCount;
       try
       {
           var buffer = new byte[30000];
           receivedCount = await stream.ReadAsync(buffer, 0, 30000, tcs.Token).ConfigureAwait(false);
       }
       catch (TimeoutException)
       {
           receivedCount = -1;
       }
    }
}

【讨论】:

  • 我发现的其他解决方案是在取消令牌的Register 方法中注册对Close 的调用。那么当取消令牌被取消时,它会自动调用Socket.Close方法。
  • 一点注意:在您的示例中,您在方法内创建令牌。通常,您将令牌作为参数,然后您需要将对 Register 的调用包含在“使用”中,以确保在离开方法后调用未注册,否则每次都会注册调用:)
  • 我最终做了类似的事情,但写了一条空消息(前缀和后缀但没有有效负载)到流而不是调用 stream.Close() 以避免 ObjectDisposed 异常。
  • 值得注意的是 NetworkStream.Dispose()(由 Close() 调用)不是正式的线程安全的。也就是说,Mono 实现似乎在尝试这样做,并且它和 MS 实现似乎都受限于 ObjectDisposedException(或者可能是 null 取消引用?)的风险。还值得注意的是,您真的没有更好的选择。无论如何,这就是我现在正在做的事情。 :o
  • 要记住的一点是,这将在超时时关闭底层连接。在某些情况下,保持连接打开可能是有意义的。
【解决方案2】:

取消是合作的。 NetworkStream.ReadAsync 必须配合才能取消。这样做有点困难,因为这可能会使流处于未定义状态。哪些字节已经从 Windows TCP 堆栈中读取,哪些还没有? IO 不容易取消。

Reflector 显示 NetworkStream 不会覆盖 ReadAsync。这意味着它将获得Stream.ReadAsync 的默认行为,它只是将令牌扔掉。没有通用的方法可以取消 Stream 操作,因此 BCL Stream 类甚至不尝试(它不能尝试 - 没有办法这样做)。

您应该在Socket 上设置超时。

【讨论】:

  • 太糟糕了,我认为网络运营是取消可能很有意义的情况之一,因为它们可能需要很长时间。
  • 是的。我希望 BCL 确实支持 CancelIO 处理所有常见的 IO 操作。它将与CancellationToken 完美集成。
  • 该文档说,当使用异步方法时,Socket 类上的所有超时都会被忽略“超时:此选项仅适用于同步接收调用。”。所以所有 .net 异步网络方法都是不可用的,因为它们可能会导致无限锁......
  • 链接到讨论相同问题的线程:stackoverflow.com/questions/10930052/… 指向该线程social.msdn.microsoft.com/Forums/da-DK/async/thread/…,它解释了 CancelIoEx 函数确实允许取消单个操作,但是是在 Vista 中引入的。但 .NET 4 仍支持 XP-SP3,因此 BCL 无法轻松使用该 API。
  • @Softlion 这个 API 很好。没有超时的异步套接字调用太离谱了!我不知道。这意味着默认情况下,许多使用 Socket 的新异步 C# 应用程序将被破坏。
【解决方案3】:

根据 Softlion 的回答中的描述:

使用 Task.WaitAny 将异步调用与延迟任务 (Task.Delay) 结合起来。当延迟在 io 任务之前过去时,关闭流。这将强制任务停止。您应该正确处理 io 任务的异步异常。你应该为delay任务和io任务添加一个延续任务。

我已经编写了一些代码,可以让您使用超时进行异步读取:

using System;
using System.Net.Sockets;
using System.Threading.Tasks;

namespace ConsoleApplication2013
{
    class Program
    {
        /// <summary>
        /// Does an async read on the supplied NetworkStream and will timeout after the specified milliseconds.
        /// </summary>
        /// <param name="ns">NetworkStream object on which to do the ReadAsync</param>
        /// <param name="s">Socket associated with ns (needed to close to abort the ReadAsync task if the timeout occurs)</param>
        /// <param name="timeoutMillis">number of milliseconds to wait for the read to complete before timing out</param>
        /// <param name="buffer"> The buffer to write the data into</param>
        /// <param name="offset">The byte offset in buffer at which to begin writing data from the stream</param>
        /// <param name="amountToRead">The maximum number of bytes to read</param>
        /// <returns>
        /// a Tuple where Item1 is true if the ReadAsync completed, and false if the timeout occurred,
        /// and Item2 is set to the amount of data that was read when Item1 is true
        /// </returns>
        public static async Task<Tuple<bool, int>> ReadWithTimeoutAsync(NetworkStream ns, Socket s, int timeoutMillis, byte[] buffer, int offset, int amountToRead)
        {
            Task<int> readTask = ns.ReadAsync(buffer, offset, amountToRead);
            Task timeoutTask = Task.Delay(timeoutMillis);

            int amountRead = 0;

            bool result = await Task.Factory.ContinueWhenAny<bool>(new Task[] { readTask, timeoutTask }, (completedTask) =>
            {
                if (completedTask == timeoutTask) //the timeout task was the first to complete
                {
                    //close the socket (unless you set ownsSocket parameter to true in the NetworkStream constructor, closing the network stream alone was not enough to cause the readTask to get an exception)
                    s.Close();
                    return false; //indicate that a timeout occurred
                }
                else //the readTask completed
                {
                    amountRead = readTask.Result;
                    return true;
                }
            });

            return new Tuple<bool, int>(result, amountRead);
        }

        #region sample usage
        static void Main(string[] args)
        {
            Program p = new Program();
            Task.WaitAll(p.RunAsync());
        }

        public async Task RunAsync()
        {
            Socket s = new Socket(SocketType.Stream, ProtocolType.Tcp);

            Console.WriteLine("Connecting...");
            s.Connect("127.0.0.1", 7894);  //for a simple server to test the timeout, run "ncat -l 127.0.0.1 7894"
            Console.WriteLine("Connected!");

            NetworkStream ns = new NetworkStream(s);

            byte[] buffer = new byte[1024];
            Task<Tuple<bool, int>> readWithTimeoutTask = Program.ReadWithTimeoutAsync(ns, s, 3000, buffer, 0, 1024);
            Console.WriteLine("Read task created");

            Tuple<bool, int> result = await readWithTimeoutTask;

            Console.WriteLine("readWithTimeoutTask is complete!");
            Console.WriteLine("Read succeeded without timeout? " + result.Item1 + ";  Amount read=" + result.Item2);
        }
        #endregion
    }
}

【讨论】:

    【解决方案4】:

    出现了一些问题:

    1. CancellationToken 抛出 OperationCanceledException,而不是 TimeoutException(取消并不总是由于超时)。
    2. ReceiveTimeout 不适用,因为您正在进行异步读取。即使是这样,您也会在 IOExceptionOperationCanceledException 之间出现竞争条件。
    3. 由于您正在同步连接套接字,因此您需要在此测试中设置较长的超时时间(IIRC,默认连接超时时间约为 90 秒,但可以随着 Windows 监控网络速度而更改)。
    4. 测试异步代码的正确方法是使用异步测试:

      [TestMethod]
      public async Task TestTest()
      {
          var tcp = new TcpClient() { ReceiveTimeout = 5000, SendTimeout = 20000 };
          tcp.Connect(IPAddress.Parse("176.31.100.115"), 25);
          await Read(tcp.GetStream());
      }
      

    【讨论】:

    • 1) 我知道。如果您尝试代码,它永远不会引发任何异常。
    • 2) 我知道。它用于为真实类中的内部方法提供超时,我用这个问题的真实值替换了它。
    • 3) 我同步连接套接字以演示 ReadAsync 的问题。 ConnectAsync 没有问题。好吧,它可能有问题,但我没有有效的测试用例。您可以停留 1000 万等待此方法返回。 1000 万远远超过 ~90 年代。所以这不是问题。
    • 4) 异步测试在 VS2012 上存在缺陷。它们无法正常工作,并且无法被测试资源管理器识别(所有项目都是 .net 4.5)。
    • @Softlion:这是我第一次听说有错误的异步测试。您是否已将此情况报告给 Microsoft Connect?
    【解决方案5】:

    提供有关三种不同方法的更多背景信息。我的服务监控其他 Web 应用程序的可用性。因此,它需要与各种网站建立大量连接。其中一些崩溃/返回错误/变得无响应。

    Y 轴 - 挂起的测试(会话)数。因部署/重启而降至 0。

    我。 (1 月 25 日)修改服务后,初始实现使用带有取消令牌的 ReadAsync。这导致大量测试挂起(对这些网站运行请求表明服务器有时确实没有返回内容)。

    二。 (2 月 17 日)部署了使用 Task.Delay 保护取消的更改。这完全解决了这个问题。

    private async Task<int> StreamReadWithCancellationTokenAsync(Stream stream, byte[] buffer, int count, Task cancellationDelayTask)
    {
        if (cancellationDelayTask.IsCanceled)
        {
            throw new TaskCanceledException();
        }
    
        // Stream.ReadAsync doesn't honor cancellation token. It only checks it at the beginning. The actual
        // operation is not guarded. As a result if remote server never responds and connection never closed
        // it will lead to this operation hanging forever.
        Task<int> readBytesTask = stream.ReadAsync(
            buffer,
            0,
            count);
        await Task.WhenAny(readBytesTask, cancellationDelayTask).ConfigureAwait(false);
    
        // Check whether cancellation task is cancelled (or completed).
        if (cancellationDelayTask.IsCanceled || cancellationDelayTask.IsCompleted)
        {
            throw new TaskCanceledException();
        }
    
        // Means that main task completed. We use Result directly.
        // If the main task failed the following line will throw an exception and
        // we'll catch it above.
        int readBytes = readBytesTask.Result;
    
        return readBytes;
    }
    

    III(3 月 3 日)在此 StackOverflow 之后实现了基于超时关闭流:

    using (timeoutToken.Register(() => stream.Close()))
    {
        // Stream.ReadAsync doesn't honor cancellation token. It only checks it at the beginning. The actual
        // operation is not guarded. As a result if a remote server never responds and connection never closed
        // it will lead to this operation hanging forever.
        // ReSharper disable once MethodSupportsCancellation
        readBytes = await targetStream.ReadAsync(
            buffer,
            0,
            Math.Min(responseBodyLimitInBytes - totalReadBytes, buffer.Length)).ConfigureAwait(false);
    }
    

    这个实现带来了挂起(与最初的方法不同):

    恢复到 Task.Delay 解决方案。

    【讨论】:

      【解决方案6】:

      提醒一下,await _stream.WriteAsync(message,cancellationToken);(_stream 是一个 SslStream)在执行 BeginEndWriteAsync 之前会在后台检查取消令牌是否已取消,因此您必须在令牌开始写入之前取消令牌。

      public virtual Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
          {
              // If cancellation was requested, bail early with an already completed task.
              // Otherwise, return a task that represents the Begin/End methods.
              return cancellationToken.IsCancellationRequested
                          ? Task.FromCanceled(cancellationToken)
                          : BeginEndWriteAsync(buffer, offset, count);
          }
      

      【讨论】:

      • 你是对的。我的错。但是,我相信我的回答仍然是相关的,因为问题更多的是围绕取消令牌。在取消令牌方面,ReadAsync 和 WriteAsync 的行为方式相同。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2013-12-06
      • 1970-01-01
      • 2014-07-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-04-05
      相关资源
      最近更新 更多