【问题标题】:Why is TcpClient so slow and CPU hungry?为什么 TcpClient 这么慢而且 CPU 很饿?
【发布时间】:2013-07-20 04:50:19
【问题描述】:

与我的other question 相关,但现在我尝试异步,希望它能解决问题。没有。

我正在尝试创建一个简单的 SOCKS5 服务器。我将浏览器(firefox)设置为将此程序用作 SOCKS5。这个想法是一个程序连接到代理服务器,给它所需的信息,服务器只是简单地从一个连接读取/写入数据到另一个。这个只是简单地做到这一点,不记录也不过滤任何东西。它非常简单,但是由于 CPU 问题以及在您点击几页后连接到站点需要几秒钟的事实,这使得它完全无法使用。这到底是怎么吃掉这么多CPU的?为什么连接网站需要很长时间?异步和同步都受此影响

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Net.Sockets;
using System.Timers;
using System.IO;
using System.Net;
using System.Threading;

namespace ProxyTest
{
    class Program
    {
        static ManualResetEvent tcpClientConnected =new ManualResetEvent(false);
        static void Main(string[] args)
        {
            var s2 = new TcpListener(9998);
            s2.Start();
            Task.Run(() =>
            {
                while (true)
                {
                    tcpClientConnected.Reset();
                    s2.BeginAcceptTcpClient(Blah, s2);
                    tcpClientConnected.WaitOne();
                }
            });
            while (true)
                System.Threading.Thread.Sleep(10000000);
        }

        static void Blah(IAsyncResult ar)
        {
            try
            {
                Console.WriteLine("Connection");
                TcpListener listener = (TcpListener)ar.AsyncState;
                using (var socketin = listener.EndAcceptTcpClient(ar))
                {
                    tcpClientConnected.Set();
                    var ns1 = socketin.GetStream();
                    var r1 = new BinaryReader(ns1);
                    var w1 = new BinaryWriter(ns1);

                    if (false)
                    {
                        var s3 = new TcpClient();
                        s3.Connect("127.0.0.1", 9150);
                        var ns3 = s3.GetStream();
                        var r3 = new BinaryReader(ns3);
                        var w3 = new BinaryWriter(ns3);
                        while (true)
                        {
                            while (ns1.DataAvailable)
                            {
                                var b = ns1.ReadByte();
                                w3.Write((byte)b);
                                //Console.WriteLine("1: {0}", b);
                            }
                            while (ns3.DataAvailable)
                            {
                                var b = ns3.ReadByte();
                                w1.Write((byte)b);
                                Console.WriteLine("2: {0}", b);
                            }
                        }
                    }

                    {
                        if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1))
                            return;
                        var c = r1.ReadByte();
                        for (int i = 0; i < c; ++i)
                            r1.ReadByte();
                        w1.Write((byte)5);
                        w1.Write((byte)0);
                    }
                    {
                        if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1))
                            return;
                        if (r1.ReadByte() != 0)
                            return;
                    }
                    byte[] ipAddr = null;
                    string hostname = null;
                    var type = r1.ReadByte();
                    switch (type)
                    {
                        case 1:
                            ipAddr = r1.ReadBytes(4);
                            break;
                        case 3:
                            hostname = Encoding.ASCII.GetString(r1.ReadBytes(r1.ReadByte()));
                            break;
                        case 4:
                            throw new Exception();
                    }
                    var nhport = r1.ReadInt16();
                    var port = IPAddress.NetworkToHostOrder(nhport);

                    var socketout = new TcpClient();
                    if (hostname != null)
                        socketout.Connect(hostname, port);
                    else
                        socketout.Connect(new IPAddress(ipAddr), port);

                    w1.Write((byte)5);
                    w1.Write((byte)0);
                    w1.Write((byte)0);
                    w1.Write(type);
                    switch (type)
                    {
                        case 1:
                            w1.Write(ipAddr);
                            break;
                        case 2:
                            w1.Write((byte)hostname.Length);
                            w1.Write(Encoding.ASCII.GetBytes(hostname), 0, hostname.Length);
                            break;
                    }
                    w1.Write(nhport);

                    var buf1 = new byte[4096];
                    var buf2 = new byte[4096];
                    var ns2 = socketout.GetStream();
                    var r2 = new BinaryReader(ns2);
                    var w2 = new BinaryWriter(ns2);
                    Task.Run(() =>
                    {
                        var re = new ManualResetEvent(false);
                        while (true)
                        {
                            re.Reset();
                            ns1.BeginRead(buf1, 0, buf1.Length, ReadCallback, new A() { buf = buf1, thisSocket = socketin, otherSocket = socketout, thisStream = ns1, otherStream = ns2, re=re });
                            re.WaitOne();
                        }
                    });
                    Task.Run(() =>
                    {
                        var re = new ManualResetEvent(false);
                        while (true)
                        {
                            re.Reset();
                            ns2.BeginRead(buf2, 0, buf2.Length, ReadCallback, new A() { buf = buf2, thisSocket = socketout, otherSocket = socketin, thisStream = ns2, otherStream = ns1, re = re });
                            re.WaitOne();
                        }
                    });
                    while (true)
                    {
                        if (socketin.Connected == false)
                            return;
                        Thread.Sleep(100);
                    }
                }
            }
            catch { }
        }
        class A { public byte[] buf; public TcpClient thisSocket, otherSocket; public NetworkStream thisStream, otherStream; public ManualResetEvent re;};
        static void ReadCallback(IAsyncResult ar)
        {
            try
            {
                var a = (A)ar.AsyncState;
                var ns1 = a.thisStream;
                var len = ns1.EndRead(ar);
                a.otherStream.Write(a.buf, 0, len);
                a.re.Set();
            }
            catch
            {
            }
        }
    }
}

【问题讨论】:

  • 我认为问题与您的其他同步问题相同(我已在此处发表评论):socketin.Connected == false 测试永远不会正确,因此代码将所有时间都花在Thread.Sleep,所有 Blah 调用都不会终止。

标签: c# .net sockets proxy tcpclient


【解决方案1】:

这里发生了一些事情!

异步调用都称为同步风格。例如,启动操作的线程调用 WaitOne - 这基本上只是使其等同于同步调用,没有什么不同。

睡眠循环不好。 sleep(1) 循环将快速响应但使用一些 CPU,sleep(1000) 循环将响应缓慢但使用较少 CPU。 在睡眠循环中拥有十几个线程并不会占用太多 CPU,但如果线程数不断增加,CPU 使用率将变得非常重要。 最好的方法是使用异步调用而不是轮询。

大量任务循环运行。如果没有保证退出路径,这些会导致线程数猛增。

如果要将数据从套接字 A 转发到套接字 B,则需要在任一套接字关闭时采取行动:停止转发,确保待处理的写入完成并关闭套接字。

当前的实现不能正确确保两个转发任务在关闭时都关闭,如果任务在设置事件之前发生异常,则启动任务然后阻塞手动重置事件的技术可能会失败。这两种情况都会使任务无限运行。

检查Socket.Connected 似乎是一件显而易见的事情,但实际上这只是最后一次 IO 操作是否遇到断开连接的缓存。 我更喜欢对“零接收”采取行动,这是您断开连接的第一个通知。

我通过 NuGet 使用 PowerThreading 为您的原始同步例程构建了一个快速异步版本(这是在框架 4.5 之前执行异步例程的一种方式)。 这可以使用 TcpListener,CPU 使用率为零且线程数非常少。

这可以在 vanilla c# 中使用 async/await 来完成...我只是还不知道怎么做 :)

using System;
using System.Collections.Generic;
using System.Text;

namespace AeProxy
{
    using System.IO;
    using System.Net;
    using System.Net.Sockets;
    using System.Threading;
    // Need to install Wintellect.Threading via NuGet for this:
    using Wintellect.Threading.AsyncProgModel;

    class Program
    {
        static void Main(string[] args)
        {
            var ae = new AsyncEnumerator() {SyncContext = null};
            var mainOp = ae.BeginExecute(ListenerFiber(ae), null, null);
            // block until main server is finished
            ae.EndExecute(mainOp);
        }

        static IEnumerator<int> ListenerFiber(AsyncEnumerator ae)
        {
            var listeningServer = new TcpListener(IPAddress.Loopback, 9998);
            listeningServer.Start();
            while (!ae.IsCanceled())
            {
                listeningServer.BeginAcceptTcpClient(ae.End(0, listeningServer.EndAcceptTcpClient), null);
                yield return 1;
                if (ae.IsCanceled()) yield break;
                var clientSocket = listeningServer.EndAcceptTcpClient(ae.DequeueAsyncResult());
                var clientAe = new AsyncEnumerator() { SyncContext = null };
                clientAe.BeginExecute(
                    ClientFiber(clientAe, clientSocket),
                    ar =>
                        {
                            try
                            {
                                clientAe.EndExecute(ar);
                            }
                            catch { }
                    }, null);
            }
        }

        static long clients = 0;

        static IEnumerator<int> ClientFiber(AsyncEnumerator ae, TcpClient clientSocket)
        {
            Console.WriteLine("ClientFibers ++{0}", Interlocked.Increment(ref clients));
            try
            {
                // original code to do handshaking and connect to remote host
                var ns1 = clientSocket.GetStream();
                var r1 = new BinaryReader(ns1);
                var w1 = new BinaryWriter(ns1);

                if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1)) yield break;
                var c = r1.ReadByte();
                for (int i = 0; i < c; ++i) r1.ReadByte();
                w1.Write((byte)5);
                w1.Write((byte)0);

                if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1)) yield break;
                if (r1.ReadByte() != 0) yield break;

                byte[] ipAddr = null;
                string hostname = null;
                var type = r1.ReadByte();
                switch (type)
                {
                    case 1:
                        ipAddr = r1.ReadBytes(4);
                        break;
                    case 3:
                        hostname = Encoding.ASCII.GetString(r1.ReadBytes(r1.ReadByte()));
                        break;
                    case 4:
                        throw new Exception();
                }
                var nhport = r1.ReadInt16();
                var port = IPAddress.NetworkToHostOrder(nhport);
                var socketout = new TcpClient();
                if (hostname != null) socketout.Connect(hostname, port);
                else socketout.Connect(new IPAddress(ipAddr), port);
                w1.Write((byte)5);
                w1.Write((byte)0);
                w1.Write((byte)0);
                w1.Write(type);
                switch (type)
                {
                    case 1:
                        w1.Write(ipAddr);
                        break;
                    case 3:
                        w1.Write((byte)hostname.Length);
                        w1.Write(Encoding.ASCII.GetBytes(hostname), 0, hostname.Length);
                        break;
                }
                w1.Write(nhport);
                using (var ns2 = socketout.GetStream())
                {
                    var forwardAe = new AsyncEnumerator() { SyncContext = null };
                    forwardAe.BeginExecute(
                        ForwardingFiber(forwardAe, ns1, ns2), ae.EndVoid(0, forwardAe.EndExecute), null);
                    yield return 1;
                    if (ae.IsCanceled()) yield break;
                    forwardAe.EndExecute(ae.DequeueAsyncResult());
                }
            }
            finally
            {
                Console.WriteLine("ClientFibers --{0}", Interlocked.Decrement(ref clients));
            }
        }

        private enum Operation { OutboundWrite, OutboundRead, InboundRead, InboundWrite } 

        const int bufsize = 4096;

        static IEnumerator<int> ForwardingFiber(AsyncEnumerator ae, NetworkStream inputStream, NetworkStream outputStream)
        {
            while (!ae.IsCanceled())
            {
                byte[] outputRead = new byte[bufsize], outputWrite = new byte[bufsize];
                byte[] inputRead = new byte[bufsize], inputWrite = new byte[bufsize];
                // start off output and input reads.
                // NB ObjectDisposedExceptions can be raised here when a socket is closed while an async read is in progress.
                outputStream.BeginRead(outputRead, 0, bufsize, ae.End(1, ar => outputStream.EndRead(ar)), Operation.OutboundRead);
                inputStream.BeginRead(inputRead, 0, bufsize, ae.End(1, ar => inputStream.EndRead(ar)), Operation.InboundRead);
                var pendingops = 2;
                while (!ae.IsCanceled())
                {
                    // wait for the next operation to complete, the state object passed to each async
                    // call can be used to find out what completed.
                    if (pendingops == 0) yield break;
                    yield return 1;
                    if (!ae.IsCanceled())
                    {
                        int byteCount;
                        var latestEvent = ae.DequeueAsyncResult();
                        var currentOp = (Operation)latestEvent.AsyncState;
                        if (currentOp == Operation.InboundRead)
                        {
                            byteCount = inputStream.EndRead(latestEvent);
                            if (byteCount == 0)
                            {
                                pendingops--;
                                outputStream.Close();
                                continue;
                            }
                            Array.Copy(inputRead, outputWrite, byteCount);
                            outputStream.BeginWrite(outputWrite, 0, byteCount, ae.EndVoid(1, outputStream.EndWrite), Operation.OutboundWrite);
                            inputStream.BeginRead(inputRead, 0, bufsize, ae.End(1, ar => inputStream.EndRead(ar)), Operation.InboundRead);
                        }
                        else if (currentOp == Operation.OutboundRead)
                        {
                            byteCount = outputStream.EndRead(latestEvent);
                            if (byteCount == 0)
                            {
                                pendingops--;
                                inputStream.Close();
                                continue;
                            }
                            Array.Copy(outputRead, inputWrite, byteCount);
                            inputStream.BeginWrite(inputWrite, 0, byteCount, ae.EndVoid(1, inputStream.EndWrite), Operation.InboundWrite);
                            outputStream.BeginRead(outputRead, 0, bufsize, ae.End(1, ar => outputStream.EndRead(ar)), Operation.OutboundRead);
                        }
                        else if (currentOp == Operation.InboundWrite)
                        {
                            inputStream.EndWrite(latestEvent);
                        }
                        else if (currentOp == Operation.OutboundWrite)
                        {
                            outputStream.EndWrite(latestEvent);
                        }
                    }
                }
            }
        }
    }
}

【讨论】:

  • 抱歉,距离这次赏金活动只有 2 小时,我会离开。这段代码比其他答案有更多的例外。它似乎使用了更多的连接,但我不完全确定,所以我不会计算在内。我真的很想知道为什么我的代码很慢,以及如何将它写得更快,而另一个答案解释得更多。我认为 msdn beginread/write 是有缺陷的,尽管我不相信它的工作方式相同。实际上,它绝对不会像我的代码的几个不同变体那样使用大量 cpu 的方式显示 beginread 而这并没有
  • 抱歉,我确实试图解释你的代码出了什么问题,@felega 的答案并不是你想要做的异步的。我的示例是通过 TcpClient 使用 BeginRead/BeginWrite,证明它没有问题。
【解决方案2】:

您应该使用异步版本的 TcpClient 方法而不是生成线程。

【讨论】:

  • 我使用 BeginAcceptTcpClient,BeginRead 我怎么不使用异步 tcpclient
【解决方案3】:

警告:由于我没有使用 4.5,因此我不得不稍微调整一下。

Task.Run() --> new Thread().Start()

您使用的线程过多。 简单地尝试在 stackoverflow 中加载这个问题会导致 30 多个线程产生,这重现了使用 Task.Run() 看到的行为。

随着您的代码减少到每个连接一个线程,我的 CPU 使用率徘徊在 0% 左右。一切都加载得很快。

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.ComponentModel;
using System.Data;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Net.Sockets;
using System.Timers;
using System.IO;
using System.Net;
using System.Threading;

namespace SOCKS5
{
    static class Program
    {
        static void Main()
        {
            var s2 = new TcpListener(9998);
            s2.Start();

            while (true)
            {
                if (s2.Pending())
                {
                    Thread test = new Thread(() =>
                    {
                        using (TcpClient client = s2.AcceptTcpClient())
                        {
                            Blah(client);
                        }
                    });

                    test.Start();
                }

                Thread.Sleep(10);
            }
        }

        static void Blah(TcpClient listener)
        {
            try
            {
                Console.WriteLine("Connection");
                //TcpListener listener = (TcpListener)ar.AsyncState;


                //tcpClientConnected.Set();
                var ns1 = listener.GetStream();
                var r1 = new BinaryReader(ns1);
                var w1 = new BinaryWriter(ns1);

                if (false)
                {
                    var s3 = new TcpClient();
                    s3.Connect("127.0.0.1", 9150);
                    var ns3 = s3.GetStream();
                    var r3 = new BinaryReader(ns3);
                    var w3 = new BinaryWriter(ns3);
                    while (true)
                    {
                        while (ns1.DataAvailable)
                        {
                            var b = ns1.ReadByte();
                            w3.Write((byte)b);
                            //Console.WriteLine("1: {0}", b);
                        }
                        while (ns3.DataAvailable)
                        {
                            var b = ns3.ReadByte();
                            w1.Write((byte)b);
                            Console.WriteLine("2: {0}", b);
                        }
                    }
                }

                {
                    if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1))
                        return;
                    var c = r1.ReadByte();
                    for (int i = 0; i < c; ++i)
                        r1.ReadByte();
                    w1.Write((byte)5);
                    w1.Write((byte)0);
                }
                {
                    if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1))
                        return;
                    if (r1.ReadByte() != 0)
                        return;
                }
                byte[] ipAddr = null;
                string hostname = null;
                var type = r1.ReadByte();
                switch (type)
                {
                    case 1:
                        ipAddr = r1.ReadBytes(4);
                        break;
                    case 3:
                        hostname = Encoding.ASCII.GetString(r1.ReadBytes(r1.ReadByte()));
                        break;
                    case 4:
                        throw new Exception();
                }
                var nhport = r1.ReadInt16();
                var port = IPAddress.NetworkToHostOrder(nhport);

                var socketout = new TcpClient();
                if (hostname != null)
                    socketout.Connect(hostname, port);
                else
                    socketout.Connect(new IPAddress(ipAddr), port);

                w1.Write((byte)5);
                w1.Write((byte)0);
                w1.Write((byte)0);
                w1.Write(type);
                switch (type)
                {
                    case 1:
                        w1.Write(ipAddr);
                        break;
                    case 2:
                        w1.Write((byte)hostname.Length);
                        w1.Write(Encoding.ASCII.GetBytes(hostname), 0, hostname.Length);
                        break;
                }
                w1.Write(nhport);

                var buf1 = new byte[4096];
                var buf2 = new byte[4096];
                var ns2 = socketout.GetStream();

                DateTime last = DateTime.Now;

                while ((DateTime.Now - last).TotalMinutes < 5.0)
                {
                    if (ns1.DataAvailable)
                    {
                        int size = ns1.Read(buf1, 0, buf1.Length);
                        ns2.Write(buf1, 0, size);
                        last = DateTime.Now;
                    }
                    if (ns2.DataAvailable)
                    {
                        int size = ns2.Read(buf2, 0, buf2.Length);
                        ns1.Write(buf2, 0, size);
                        last = DateTime.Now;
                    }

                    Thread.Sleep(10);
                }
            }
            catch { }
            finally
            {
                try
                {
                    listener.Close();
                }
                catch (Exception) { }
            }
        }
    }
}

编辑:

这最终变得有点有趣。

在路由 Firefox 流量几个小时后,一些观察结果。

从未注意到确定何时关闭连接的常规模式。让线程在空闲 5 分钟后终止(无 rx/tx)会使线程计数相当低。这是一个相当安全的界限,允许 gmail 聊天等服务继续运行。

由于某种原因,程序偶尔不会收到来自浏览器的请求,这会报告超时。程序中没有关于错过请求的通知,什么都没有。仅在浏览 stackoverflow 时才注意到。还没想好。

【讨论】:

  • 再看一遍,ManualSetEvent 可能是问题所在。阻塞等待被移除。这是我能看到的唯一真正的区别。
  • 嗯。那可能是真的。无论哪种情况,我都认为代码这么慢是荒谬的。我忘了说很可能我会将此标记为答案。
  • 更新了帖子,添加了一些关于长期使用的观察结果。
  • 酷我很高兴你喜欢这个简单的 socks 实现。如果您要玩更多,您是否介意做一些事情(断点、打印线等)来查看您是否曾经使用域名而不是始终使用 ipv4 收到请求?我总是得到ipv4。如果有的话,还有更新代码的要点。我还发现 sleep 1 效果很好(在线程中,而不是 main())
  • @felega - 请注意 Task.Run()NOT 等同于 new Thread().Start()。 Task.Run 仅在某个线程池上调度任务,它不会生成和运行线程。因此,虽然 30 Thread().Start() 将产生 30 个线程,但 30 Task.Run() 将产生尽可能多的系统将在当时找到合适的结果
【解决方案4】:

您应该看看重叠的 I/O。 每个连接一个线程可能工作正常,但总的来说很糟糕。

【讨论】:

  • 我不确定如何让一个线程处理多个连接,但其中一个答案确实给了我很好的代码(尽管我将 sleep 10 更改为 1)
  • 阅读这个:codeproject.com/Articles/10280/…这就是你应该在 Windows 上这样做的方式,即使你不需要数千个并发连接
【解决方案5】:

在这一行……​​

 while (true)
                    System.Threading.Thread.Sleep(10000000);

用一个简单的替换它不会更好:

Console.ReadKey();

是我看到的唯一 CPU 消耗。

另外,作为建议,您应该限制传入连接的数量并使用线程池模式(在队列中或其他东西中)。

【讨论】:

  • 这是一个更好的评论而不是一个答案。睡眠与问题无关。如果你不相信我,你可以运行代码。它与从套接字或 WaitOne 读取/写入有关
  • System.Threading.Thread.Sleep 不是旋转等待。它不应该占用 CPU。
猜你喜欢
  • 2012-04-06
  • 2023-01-17
  • 2023-04-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-10-19
  • 2016-10-13
  • 1970-01-01
相关资源
最近更新 更多