【问题标题】:ZeroMQ performance issueZeroMQ 性能问题
【发布时间】:2013-10-24 06:34:46
【问题描述】:

我对 ZeroMQ 有疑问,我认为这是因为我对它不是很熟悉。

我正在尝试构建一个非常简单的服务,其中多个客户端连接到服务器并发送查询。服务器响应此查询。

当我使用 REQ-REP 套接字组合(客户端使用 REQ,服务器绑定到 REP 套接字)时,我能够在服务器端每秒获得接近 60,000 条消息(当客户端和服务器在同一台机器上时)。当分布在不同机器上时,不同机器上的每个新客户端实例都会线性增加服务器每秒的消息数,并且如果有足够的客户端实例,则可以轻松达到 40,000+。

现在 REP 套接字阻塞了,所以我按照 ZeroMQ 指南使用了 rrbroker 模式 (http://zguide.zeromq.org/cs:rrbroker):

REQ (client) <----> [server ROUTER -- DEALER --- REP (workers running on different threads)]

但是,这完全破坏了性能。跨机器运行时,我在服务器上每秒只能收到大约 4000 条消息。不仅如此,在不同机器上启动的每个新客户端都会降低其他所有客户端的吞吐量。

我很确定我在做一些愚蠢的事情。我想知道这里的 ZeroMQ 专家是否可以指出任何明显的错误。谢谢!

编辑:根据建议添加代码。我正在使用 clrzmq nuget 包 (https://www.nuget.org/packages/clrzmq-x64/)

这是客户端代码。计时器会计算每秒收到的响应数。

for (int i = 0; i < numTasks; i++) { Task.Factory.StartNew(() => Client(), TaskCreationOptions.LongRunning); }

void Client()
    {
        using (var ctx = new Context())
        {
            Socket socket = ctx.Socket(SocketType.REQ);
            socket.Connect("tcp://192.168.1.10:1234");
            while (true)
            {
                socket.Send("ping", Encoding.Unicode);
                string res = socket.Recv(Encoding.Unicode);
            }
        }
    }

服务器 - 案例 1:服务器跟踪每秒收到的请求数

using (var zmqContext = new Context())
{
    Socket socket = zmqContext.Socket(SocketType.REP);
    socket.Bind("tcp://*:1234");
    while (true)
    {
        string q = socket.Recv(Encoding.Unicode);
        if (q.CompareTo("ping") == 0) {
            socket.Send("pong", Encoding.Unicode);
        }
    }
}       

通过此设置,在服务器端,我可以看到每秒收到大约 60,000 个请求(当客户端在同一台机器上时)。在不同机器上时,每个新客户端都会按预期增加服务器接收到的请求数。

服务器案例 2:这本质上是 ZMQ 指南中的 rrbroker。

   void ReceiveMessages(Context zmqContext, string zmqConnectionString, int numWorkers)
   {
       List<PollItem> pollItemsList = new List<PollItem>();

       routerSocket = zmqContext.Socket(SocketType.ROUTER);
       try
       {
            routerSocket.Bind(zmqConnectionString);
            PollItem pollItem = routerSocket.CreatePollItem(IOMultiPlex.POLLIN);
            pollItem.PollInHandler += RouterSocket_PollInHandler;
            pollItemsList.Add(pollItem);
       }
       catch (ZMQ.Exception ze)
       {
            Console.WriteLine("{0}", ze.Message);
            return;
       }

        dealerSocket = zmqContext.Socket(SocketType.DEALER);
        try
        {
            dealerSocket.Bind("inproc://workers");
            PollItem pollItem = dealerSocket.CreatePollItem(IOMultiPlex.POLLIN);
            pollItem.PollInHandler += DealerSocket_PollInHandler;
            pollItemsList.Add(pollItem);
        }
        catch (ZMQ.Exception ze)
        {
            Console.WriteLine("{0}", ze.Message);
            return;
        }

        // Start the worker pool; cant connect  
        // to inproc socket before binding.
        workerPool.Start(numWorkers);

        while (true)
        {
           zmqContext.Poll(pollItemsList.ToArray());
        }
    }

    void RouterSocket_PollInHandler(Socket socket, IOMultiPlex revents)
    {
        RelayMessage(routerSocket, dealerSocket);
    }

    void DealerSocket_PollInHandler(Socket socket, IOMultiPlex revents)
    {
        RelayMessage(dealerSocket, routerSocket);
    }

    void RelayMessage(Socket source, Socket destination)
    {
        bool hasMore = true;
        while (hasMore)
        {
            byte[] message = source.Recv();
            hasMore = source.RcvMore;
            destination.Send(message, message.Length, hasMore ? SendRecvOpt.SNDMORE : SendRecvOpt.NONE);
        }
    }    

worker pool的start方法在哪里:

   public void Start(int numWorkerTasks=8)
    {   
        for (int i = 0; i < numWorkerTasks; i++)
        {
            QueryWorker worker = new QueryWorker(this.zmqContext);
            Task task = Task.Factory.StartNew(() =>
            worker.Start(),
            TaskCreationOptions.LongRunning);
        }
        Console.WriteLine("Started {0} with {1} workers.", this.GetType().Name, numWorkerTasks);
    }

public class QueryWorker
{
    Context zmqContext;

    public QueryWorker(Context zmqContext)
    {
        this.zmqContext = zmqContext;
    }

    public void Start()
    {
        Socket socket = this.zmqContext.Socket(SocketType.REP);
        try
        {
            socket.Connect("inproc://workers");
        }
        catch (ZMQ.Exception ze)
        {
            Console.WriteLine("Could not create worker, error: {0}", ze.Message);
            return;
        }

        while (true)
        {
            try
            {
                string message = socket.Recv(Encoding.Unicode);
                if (message.CompareTo("ping") == 0)
                {
                    socket.Send("pong", Encoding.Unicode);
                }
            }
            catch (ZMQ.Exception ze)
            {
                Console.WriteLine("Could not receive message, error: " + ze.ToString());
            }
        }
    }
}

【问题讨论】:

    标签: c# performance zeromq


    【解决方案1】:

    您能否发布一些源代码或至少对您的测试用例进行更详细的说明?一般来说,构建您的设计的方法是一次进行一项更改,并在每次更改时进行测量。您始终可以从已知的工作设计逐步转移到更复杂的设计。

    【讨论】:

      【解决方案2】:

      “路由器”很可能是瓶颈。

      查看这些相关问题:

      1. Client maintenance in ZMQ ROUTER
      2. Load testing ZeroMQ (ZMQ_STREAM) for finding the maximum simultaneous users it can handle

      ROUTER(和 ZMQ_STREAM,它只是 ROUTER 的一种变体)必须在内部维护客户端映射,因此 IMO 它可以接受来自特定客户端的有限连接。看起来ROUTER可以多路复用多个客户端,只要每个客户端只有一个活动连接。

      我在这里可能是错的 - 但我没有看到太多相反的证据(简单的工作代码可扩展到具有 ROUTER 或 STREAM 的多连接的多客户端)。

      ZeroMQ 的并发连接肯定有非常严格的限制,尽管看起来没人知道是什么原因造成的。

      【讨论】:

        【解决方案3】:

        我已经完成了使用 C# 中的各种方法调用本机非托管 DLL 函数的性能测试: 1. C++/CLI 包装器 2. 调用 3. ZeroMQ/clrzmq

        最后一个对你来说可能很有趣。

        我在性能测试结束时的发现是,在我尝试优化绑定源代码中的 PInvoke 调用后,使用 ZMQ 绑定 clrzmq 没有用,并且产生了 100 倍的性能开销。因此,我使用了没有绑定但使用 PInvoke 调用的 ZMQ。这些调用必须使用 cdecl 约定和选项“SuppressUnmanagedCodeSecurity”来获得最快的速度。 我只需要导入 5 个相当简单的函数。 最后,速度比 PInvoke 调用慢一点,但在我的情况下,ZMQ 超过了“inproc”。

        如果您对速度感兴趣,这可能会提示您在没有绑定的情况下尝试它。

        这不是您问题的直接答案,但总体上可以帮助您提高性能。

        【讨论】:

          猜你喜欢
          • 2016-11-17
          • 2013-04-10
          • 2013-06-07
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2017-04-13
          相关资源
          最近更新 更多