【问题标题】:How can i properly distribute data into two consumers?我怎样才能正确地将数据分配给两个消费者?
【发布时间】:2017-07-27 04:55:10
【问题描述】:

在我的应用程序中,我正在使用活动 mq,在消费者端,我正在运行两个实例(消费者)来处理请求。由于两个实例正在侦听同一个队列,因此发生了一些冲突。如果多次收到相同的数据,一个请求处理了多次,为了克服这个问题并沟通两个实例,我已经实现了 hazelcast,它运行良好,但有时数据没有正确分布到两个实例中,如果我只发送一个实例的批量数据正在处理所有任务。

我在生产者端使用的代码。

public synchronized static void createMQRequestForPoster(Long zoneId, List<String> postJobs, int priority) throws JMSException {

    Connection connection = null;
    try {
        connection = factory.createConnection();

        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Queue queue = session.createQueue("customQueue");
        MessageProducer producer = session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        logger.info("List of jobs adding into poster queue: "+postJobs.size());
        for(String str : postJobs) {

            TextMessage message = session.createTextMessage();
            JSONObject obj = new JSONObject();
            obj.put("priority", priority);
            obj.put("zoneId", zoneId);
            obj.put("postJobs", str);
            logger.debug(obj.toString());
            message.setText(obj.toString());
            message.setIntProperty(ActiveMQReqProcessor.REQ_TYPE, 0);
            producer.send(message);
        }
    } catch (JMSException | JSONException e) {
        logger.warn("Failed to add poster request to ActiveMq", e);
    } finally {
        if(connection != null)
            connection.close();
    }
}

我在消费者端使用的代码。

私有静态 void activeMQPostProcessor() {

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(AppCoding.NZ_JMS_URL);
    Connection connection = null;
    try {
        connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("customQueue");


        MessageConsumer consumer = session.createConsumer(queue);
        MessageListener listener = new MessageListener() {

            @Override
            public void onMessage(Message message) {

                if (message instanceof TextMessage) {

                    try {
                        TextMessage textMessage = (TextMessage) message;
                        logger.info("Received message " + textMessage.getText());
                        JSONObject jsonObj = new JSONObject(textMessage.getText());
                        HazelcastClusterInstance.getInstance().add(processOnPoster(jsonObj));
                        message.acknowledge();
                    } catch (JSONException e) {

                    } catch (JMSException e) {

                    }

                    logger.info("Adding Raw Message to Internal Queue");
                }
            }
        };
        consumer.setMessageListener(listener);
        logger.info("Waiting for new posts from selector, scheduler.");

    } catch (JMSException e) {
        logger.info(e);
    }
}

我只在一段时间内观察到这种情况。我该如何处理?

【问题讨论】:

    标签: java activemq hazelcast


    【解决方案1】:

    您很可能会看到预取消息的不平衡。消费者连接并请求一个预取块,第一个往往会在第二个进入之前获得大批量,因此它似乎占用了消息。

    您需要针对您的用例调整预取,请参阅prefetch documentation

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-06-21
      • 2021-08-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-06-15
      • 2021-09-24
      相关资源
      最近更新 更多