【问题标题】:SocketAsyncEventArgs Send/Receive OrderSocketAsyncEventArgs 发送/接收顺序
【发布时间】:2020-07-14 07:42:39
【问题描述】:

我最近在一个项目中使用了 SocketAsyncEventArgs,我遇到了一些问题,即 ReceiveAsync 有时会以与通过 SendAsync 发送的顺序不同的顺序获取数据。在 SendAsync 方法中发送的每个数据块都会被维护,但这些块的顺序不一定正确。也许我对 SendAsync 方法的理解不正确,但我认为特别是使用 SocketType.Stream 和 ProtocolType.Tcp 可以确保保持顺序。我知道底层过程将不可避免地破坏消息,并且 ReceiveAsync 通常会读取少于缓冲区分配的内容。但我假设发送和接收流会保持顺序。

我开发了一个显示问题的测试控制台程序。它每次尝试使用一组不同的套接字和端口运行大约 20 次。在我的笔记本电脑上,它通常会通过一次然后第二次失败;通常在期待第二个块时会收到一个稍后的块。从其他测试中,我知道预期的块最终确实会出现,只是不按顺序。

需要注意的是,我能够在 Windows 2008 远程服务器上对其进行测试并且没有任何问题。但是,它从未接近在我的笔记本电脑上完成。事实上,如果我让调试执行在异常中断中挂起一段时间,我已经让它不止一次地完全冻结了我的笔记本电脑,并且不得不进行硬重启。这是我在 Windows 7 上使用 VS2017 运行的工作笔记本电脑。我不确定这是否是一个因素,但它正在运行 Symantec Endpoint Protection,尽管我在日志中没有发现任何内容。

所以我的问题是,我对 SocketAsyncEventArgs 的运作方式有错误的看法吗?还是我的代码是一场灾难(也许两者兼而有之)?它是我的笔记本电脑独有的吗? (最后一个让我觉得我正在为编程新手做准备,并且您认为编译器一定有问题。)

using System;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;

static class DumTest
{

    static void Main(string[] args)
    {
        for (int i = 9177; i < 9199; i++)
        {
            RunDum(i);
            //Thread.Sleep(350);
        }

        Console.WriteLine("all done.");
        Console.ReadLine();
    }

    static void RunDum(int port)
    {
        var dr = new DumReceiver(port);
        var ds = new DumSender(port);

        dr.Acception.Wait();

        ds.Connection.Wait();

        dr.Completion.Wait();

        ds.Completion.Wait();

        Console.WriteLine($"Completed {port}. " +
            $"sent: {ds.SegmentsSent} segments, received: {dr.SegmentsRead} segments");
    }
}

class DumReceiver
{
    private readonly SocketAsyncEventArgs eva = new SocketAsyncEventArgs();
    private readonly TaskCompletionSource<object> tcsAcc = new TaskCompletionSource<object>();

    private TaskCompletionSource<object> tcsRcv;
    private Socket socket;

    internal DumReceiver(int port)
    {
        this.eva.Completed += this.Received;

        var lstSock = new Socket(
            AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

        var localIP = Dns.GetHostEntry(Dns.GetHostName()).AddressList
            .First(i => i.AddressFamily == AddressFamily.InterNetwork);

        lstSock.Bind(new IPEndPoint(localIP, port));
        lstSock.Listen(1);

        var saea = new SocketAsyncEventArgs();
        saea.Completed += this.AcceptCompleted;
        lstSock.AcceptAsync(saea);
    }

    internal Task Acception => this.tcsAcc.Task;

    internal Task Completion { get; private set; }

    internal int SegmentsRead { get; private set; }

    private void AcceptCompleted(object sender, SocketAsyncEventArgs e)
    {
        if (e.SocketError == SocketError.Success)
        {
            this.socket = e.AcceptSocket;
            e.Dispose();
            try
            {
                this.Completion = this.ReceiveLupeAsync();
            }
            finally
            {
                this.tcsAcc.SetResult(null);
            }
        }
        else
        {
            this.tcsAcc.SetException(new SocketException((int)e.SocketError));
        }
    }

    private async Task ReceiveLupeAsync()
    {
        var buf = new byte[8196];
        byte bufSeg = 1;
        int pos = 0;

        while (true)
        {
            this.tcsRcv = new TaskCompletionSource<object>();
            this.eva.SetBuffer(buf, pos, 8196 - pos);
            if (this.socket.ReceiveAsync(this.eva))
            {
                await this.tcsRcv.Task.ConfigureAwait(false);
            }

            if (this.eva.SocketError != SocketError.Success)
            {
                throw new SocketException((int)eva.SocketError);
            }

            if (this.eva.BytesTransferred == 0)
            {
                if (pos != 0)
                {
                    throw new EndOfStreamException();
                }

                break;
            }

            pos += this.eva.BytesTransferred;
            if (pos == 8196)
            {
                pos = 0;
                for (int i = 0; i < 8196; i++)
                {
                    if (buf[i] != bufSeg)
                    {
                        var msg = $"Expected {bufSeg} but read {buf[i]} ({i} of 8196). " +
                            $"Last read: {this.eva.BytesTransferred}.";
                        Console.WriteLine(msg);
                        throw new Exception(msg);
                    }
                }

                this.SegmentsRead++;
                bufSeg = (byte)(this.SegmentsRead + 1);
            }
        }
    }

    private void Received(object s, SocketAsyncEventArgs e) => this.tcsRcv.SetResult(null);
}

class DumSender
{
    private readonly SocketAsyncEventArgs eva = new SocketAsyncEventArgs();
    private readonly Socket socket = new Socket(
        AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

    private readonly TaskCompletionSource<object> tcsCon = new TaskCompletionSource<object>();
    private TaskCompletionSource<object> tcsSnd;

    internal DumSender(int port)
    {
        this.eva.Completed += this.Sent;

        var saea = new SocketAsyncEventArgs();
        var localIP = Dns.GetHostEntry(Dns.GetHostName()).AddressList
            .First(i => i.AddressFamily == AddressFamily.InterNetwork);

        saea.RemoteEndPoint = new IPEndPoint(localIP, port);
        saea.Completed += this.ConnectionCompleted;
        this.socket.ConnectAsync(saea);
    }

    internal Task Connection => this.tcsCon.Task;

    internal Task Completion { get; private set; }

    internal int SegmentsSent { get; private set; }

    private void ConnectionCompleted(object sender, SocketAsyncEventArgs e)
    {
        if (e.SocketError == SocketError.Success)
        {
            e.Dispose();

            try
            {
                this.Completion = this.SendLupeAsync();
            }
            finally
            {
                this.tcsCon.SetResult(null);
            }
        }
        else
        {
            this.tcsCon.SetException(new SocketException((int)e.SocketError));
        }
    }

    private async Task SendLupeAsync()
    {
        var buf = new byte[8196];
        byte bufSeg = 1;

        while (true)
        {
            for (int i = 0; i < 8196; i++)
            {
                buf[i] = bufSeg;
            }

            this.tcsSnd = new TaskCompletionSource<object>();
            this.eva.SetBuffer(buf, 0, 8196);
            if (this.socket.SendAsync(this.eva))
            {
                await this.tcsSnd.Task.ConfigureAwait(false);
            }

            if (this.eva.SocketError != SocketError.Success)
            {
                throw new SocketException((int)this.eva.SocketError);
            }

            if (this.eva.BytesTransferred != 8196)
            {
                throw new SocketException();
            }

            if (++this.SegmentsSent == 299)
            {
                break;
            }

            bufSeg = (byte)(this.SegmentsSent + 1);
        }

        this.socket.Shutdown(SocketShutdown.Both);
    }

    private void Sent(object s, SocketAsyncEventArgs e) => this.tcsSnd.SetResult(null);
}

【问题讨论】:

  • 我无法使用您发布的代码重现您的问题(Windows 10 Pro)。代码对我来说看起来不错。除非我需要最大的可扩展性,否则我不会为 Socket 中的 XXXAsync() 方法而烦恼;使用NetworkStream 和传统的await 友好的XXXAsync() 方法,代码更容易读写。但是我在代码中没有看到任何实际的 bug,虽然任何可用于套接字的异步机制都存在潜在的缓冲区重新排序问题,但只有当你有两个或更多读取操作同时排队,而您没有。
  • FWIW,我增加了您的测试代码以运行 22 端口测试的 20 次迭代,使用 2990 段而不是 299 段,但仍然无法使其失败。坦率地说,如果您有任何类型的 AV 软件,那么当您开始看到 I/O 代码出现意外行为时,您应该首先禁用它。这类软件中的错误太常见了,在向更大的社区寻求帮助之前,测试这种可能性太容易了,没有任何借口这样做。
  • 感谢您抽出宝贵时间对其进行测试,如果只是浪费时间,我们深表歉意。我天真地认为,因为连接是与本地主机建立的,所以它会超出 AV 的范围。我曾尝试将其关闭并以相同的结果对其进行测试。这种公司设置并没有使它非常直观或容易做到,因此它可能仍在运行。明天我会有更多的选择来测试它。感谢您的健全性检查。
  • 当然,没问题。需要明确的是:只要您一次只执行一个读取操作,您始终以正确的顺序获取数据。它是 TCP 的一个基本保证(实际上,我提到的重新排序问题,并不是因为数据最终乱序......缓冲区仍然按照它们呈现给网络层的顺序填充,它是只是处理 I/O 完成的线程有时会出现乱序)。因此,如果您确实看到数据乱序,那么某些东西肯定会妨碍并破坏事物。

标签: c# sockets socketasynceventargs


【解决方案1】:

我认为问题出在您的代码中。

您必须检查使用SocketAsyncEventArgsSocket*Async 方法的返回。如果它们返回false,则它们不会引发SocketAsyncEventArgs.Completed 事件,您必须同步处理结果。

参考文档:SocketAsyncEventArgs Class。搜索willRaiseEvent

DumReceiver的构造函数中,你不检查AcceptAsync的结果,也不处理同步完成的情况。

DumSender的构造函数中,你不检查ConnectAsync的结果,也不处理同步完成的情况。

除此之外,SocketAsyncEventArgs.Completed 事件可能在其他线程中引发,很可能是来自ThreadPool 的 I/O 线程。

每次分配给DumReceiver.tcsRcvDumSender.tcsSnd 时没有正确同步,您无法确定DumReceiver.ReceivedDumSender.Sent 使用的是最新的TaskCompletionSource

实际上,您可以在第一次迭代时获得NullReferenceException

您在以下方面缺乏同步:

  • DumReceiver,字段tcsRcvsocket以及属性CompletionSegmentsRead

  • DumSender,字段tcsSnd和属性CompletionSegmentsSent

我建议您考虑使用单个SemaphoreSlim,而不是在每次调用ReceiveAsyncSendAsync 时创建一个新的TaskCompletionSource。您将在构造函数中将信号量初始化为 0。如果*Async 操作处于挂起状态,您将在信号量上使用await WaitAsync,而Completed 事件将在信号量上使用Release

这应该足以摆脱TaskCompletionSource 字段中的竞争条件。您仍然需要在其他字段和属性上进行适当的同步。例如,没有理由不能在构造函数中创建Completion,而SegmentsReadSegmentsSent 可以是只读的,并引用一个可以通过@987654361 中的一个或多个在内部访问的字段@ 方法(例如 Interlocked.IncrementInterlocked.Add)。

【讨论】:

  • 给出的示例代码只是为了显示我正在努力解决的问题。如果代码用于除此之外的任何事情,我当然会更加注意如何处理 AcceptAsync 和 ConnectAsync。我清楚地描述的问题与处理成功的监听/连接无关。您的同步断言并不完全正确,因为发送方或接收方都没有同时运行操作。即,在前一个完成之前不会开始接收。我认为最初的 cmets 中的 Peter 在帮助我找出真正的问题方面做得很好。
  • 那么,真正的问题是什么?
  • 可能是我工作笔记本电脑上的防病毒软件。可能是其他东西,但仍然特定于我的机器。重要的是,当在任何其他机器上测试时,问题就消失了。
猜你喜欢
  • 2012-07-23
  • 2011-01-02
  • 2014-12-28
  • 1970-01-01
  • 2019-04-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多