【问题标题】:How to achieve more than 4000 msg/sec with ZeroMQ?如何使用 ZeroMQ 实现超过 4000 msg/sec?
【发布时间】:2014-04-23 17:43:00
【问题描述】:

根据之前关于 SO 的问题建议我使用 DEALER/ROUTER 模型来最大化性能(而不是 REQ/REP 模型),我设置了以下客户端和服务器代码。

客户端 asynccli.c 源触发 8 个线程,每个线程在 zmq TCP 套接字上发送和接收。服务器 asyncsrv.c 触发 4 个工作线程并使用代理将传入请求分发给工作线程。

对于持续 10 秒的测试,我体验到的性能从 40,000 条消息到 120,000 条不等,最多 12,000 条消息/秒,这是相当低的。我在具有 8GB 内存的 i7(8HT 内核)笔记本电脑上运行 Ubuntu。 我使用 czmq 库。

我认为我可以使用 ZeroMQ 实现 > 200,000 msgs/s。我想我没有正确地抓住“异步”的东西。周围有任何 C 示例代码吗?基本上我看不到如何获得异步的东西,因为我目前正在 zmq_poll() 在这里。

asynccli.c:

// results : 4000/s
#include "czmq.h"
int id = 0;

static void *
client_task (void *args)
{
    zctx_t *ctx = zctx_new ();
    void *client = zsocket_new (ctx, ZMQ_DEALER);

    char identity [10];

    sprintf (identity, "%d", id);
    zsockopt_set_identity (client, identity);
    zsocket_connect (client, "tcp://localhost:5570");

    zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
    int request_nbr = 0;
    while (true) {
        //  Tick once per second, pulling in arriving messages
        int centitick;
        for (centitick = 0; centitick < 100; centitick++) {
            zmq_poll (items, 1, 1);
            if (items [0].revents & ZMQ_POLLIN) {
                zmsg_t *msg = zmsg_recv (client);
                //zframe_print (zmsg_last (msg), identity);
                zmsg_destroy (&msg);
                break;
            }
        }

        id+=1;
        zstr_send (client, "request #%d", ++request_nbr);
    }
    zctx_destroy (&ctx);
    return NULL;
}

//  The main thread simply starts several clients and a server, and then
//  waits for the server to finish.

int main (void)
{

    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);

    zclock_sleep (10 * 1000);    //  Run for 10 seconds then quit
    printf ("\\ntotal iterations = %d\n" , id );
    return 0;
}

asyncsrv.c:

#include "czmq.h"

static void server_worker (void *args, zctx_t *ctx, void *pipe);

void *server_task (void *args)
{
    //  Frontend socket talks to clients over TCP
    zctx_t *ctx = zctx_new ();
    void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (frontend, "tcp://*:5570");

    //  Backend socket talks to workers over inproc
    void *backend = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_bind (backend, "inproc://backend");

    //  Launch pool of worker threads, precise number is not critical
    int thread_nbr;
    for (thread_nbr = 0; thread_nbr < 3; thread_nbr++)
        zthread_fork (ctx, server_worker, NULL);

    //  Connect backend to frontend via a proxy
    zmq_proxy (frontend, backend, NULL);

    zctx_destroy (&ctx);
    return NULL;
}

static void
server_worker (void *args, zctx_t *ctx, void *pipe)
{
    void *worker = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_connect (worker, "inproc://backend");

    while (true) {
        //  The DEALER socket gives us the reply envelope and message
        zmsg_t *msg = zmsg_recv (worker);
        zframe_t *identity = zmsg_pop (msg);
        zframe_t *content = zmsg_pop (msg);
        assert (content);
        zmsg_destroy (&msg);

        //  Sleep for some fraction of a second
        zframe_send (&identity, worker, ZFRAME_REUSE + ZFRAME_MORE);
        zframe_send (&content, worker, ZFRAME_REUSE);

        zframe_destroy (&identity);
        zframe_destroy (&content);
    }
}

int main (void)
{
    zthread_new (server_task, NULL);
    zclock_sleep (15 * 1000);    //  Run for 15 seconds then quit
    return 0;
}

【问题讨论】:

  • 您基本上仍在使用同步请求回复范例运行,因此 8 个线程 * 5,000 req-rep 速度 = 40,000 msgs/s,这正是您所看到的速度。
  • @Steve-o:我猜是这样,但我不明白为什么:我使用 zmq_poll() 和 DEALER/ROUTER。任何示例代码,以便我可以理解我应该怎么做?

标签: zeromq


【解决方案1】:

你必须在worker中使用与dealer相同的逻辑:

while(1)

zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 } };

zmq_poll (items, 1, 1);

if (items [0].revents & ZMQ_POLLIN) 

如果没有 if 语句,zmsg_recv() 会阻塞。此外,zmq_pollitem_t 应该在每次迭代时在 while 内重新创建。我认为这是低层次的原因,看看使用 FD_SET 和 select 的原始套接字,它可能会给你一个线索...... .

另外,这也是不正确的:

for (centitick = 0; centitick < 100; centitick++) {

如果您只想测量 100 次迭代,请使用一些计数器。

【讨论】:

    【解决方案2】:

    问题是:客户端的发送能力受到“限制”,因为在您的代码中“发送”之前有“读取”。

    现在,客户端中的代码是:

    while(true)
    {
      pull_any_income_messages()
      send()
    }
    

    这将严重限制客户端在任何未决传入消息的情况下发送任何内容。所以,这本质上变成了一个请求-回复模式。

    要扩展这一点,您必须将“拉入消息”和“发送”部分解耦。一种方法是,不让通用客户端线程同时处理发送和接收,而是为客户端创建两个单独的线程类型,一个专门发送,另一个专门读取。

    另一种方法是进行“基于信用的流量控制”。 ZMQ 指南第 7 章有相关信息 (http://zguide.zeromq.org/page:all#Chapter-Advanced-Architecture-using-MQ)。

    -GK

    http://gk.palem.in/

    【讨论】:

      猜你喜欢
      • 2015-12-08
      • 1970-01-01
      • 1970-01-01
      • 2012-07-11
      • 1970-01-01
      • 1970-01-01
      • 2014-08-28
      • 2013-12-10
      • 1970-01-01
      相关资源
      最近更新 更多