【问题标题】:Uneven flow in ZMQZMQ 中的不均匀流
【发布时间】:2013-02-25 21:03:28
【问题描述】:

我在我的 Java 应用程序中使用 ZMQ。我发现它的行为不均匀,即如果我向一位消费者发送大约 100 条消息说需要 1 秒,那么如果我们继续增加消费者,则所用时间变为 2,1.5,3 这样。没有逐渐增加或减少。我该如何纠正这个。在下面查找我的代码

// Broker

import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; import org.zeromq.ZMQStreamer;

public class Broker {

/**
 * @param args
 */
public static void main(String[] args) 
{
    Context context = ZMQ.context(1);

    Socket frontEnd = context.socket(ZMQ.PULL);
    frontEnd.bind("tcp://*:5555");

    Socket backEnd = context.socket(ZMQ.PUSH);
    backEnd.bind("tcp://*:5560");

    ZMQStreamer zmqStreamer = new ZMQStreamer(context, frontEnd, backEnd);
    zmqStreamer.run();
}

}

// Producer

import org.zeromq.ZMQ; import org.zeromq.ZMQ.Socket;

public class Producer {

public void init()
{
    ZMQ.Context context = ZMQ.context(1);
    socket = context.socket(ZMQ.PUSH);
    socket.connect("tcp://localhost:5555");
}

public void initMessage(String message)
{
    this.message = message;
}

public void sendMessage()
{
    String sendMessage = System.nanoTime() +"#"+ message;
    socket.send(sendMessage.getBytes(), 0);
}
/**
 * @param args
 */
public static void main(String[] args) 
{
    Producer producer = new Producer();
    producer.init();
    byte[] message = new byte[Integer.parseInt(args[0])];
    //message = "Hello".getBytes();
    producer.initMessage(new String(message));
    for(int i=0;i<100;i++)
    {
        producer.sendMessage();
    }
}

private Socket socket = null;
private String message;
}

//Consumer

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;

public class Consumer 
{

public void init()
{
    ZMQ.Context context = ZMQ.context(1);
    socket = context.socket(ZMQ.PULL);
    socket.connect("tcp://localhost:5560");
}

public void reciveMessage()
{
    byte[] recived = socket.recv(0);
    //System.out.println(recived.length);
    long recivedTime = System.nanoTime();
    String message = new String(recived);
    String[] splitMessage = message.split("#");
    long sendTime = Long.parseLong(splitMessage[0]);
    System.out.println("Send Time " + sendTime + " RecivedTime "
            + recivedTime + " Time taken " + (recivedTime - sendTime)
            + " Message " + message);
}
/**
 * @param args
 */
public static void main(String[] args) 
{
    Consumer consumer = new Consumer();
    consumer.init();
    for (int i=0;i<100;i++)
    {
        consumer.reciveMessage();
    }
}
private Socket socket = null;
}

【问题讨论】:

  • Schildmeijer:我附上了代码。请通过它。
  • 我知道这不是建设性的,但你的问题的标题让我想起了珍珠酱!
  • @g19fanatic: 谢谢你的建议

标签: java message-queue zeromq


【解决方案1】:

要可靠地为一段多线程代码计时,您将需要有某种方法来同步收集器/接收器的开始和结束时间(您目前尚未显示为已编程)。

查看 ZMQ 指南中的 this example,其中指出以下过程是划分数据集的正确 ZMQ 方法之一:

...

我们的超级计算应用程序是一个相当典型的并行处理模型:

We have a ventilator that produces tasks that can be done in parallel.
We have a set of workers that process tasks.
We have a sink that collects results back from the worker processes. 

...

【讨论】:

    猜你喜欢
    • 2012-07-10
    • 2021-11-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-04-04
    • 2021-10-01
    • 2019-08-17
    • 2018-07-22
    相关资源
    最近更新 更多