【问题标题】:Slow HornetQ Producer when Queue is persistent当队列持久时,HornetQ Producer 变慢
【发布时间】:2011-11-10 08:58:21
【问题描述】:

我已经尝试在 horntQ 中使用持久队列。我做了两个单独的例子(生产者,消费者)。我的消费者运行良好,但生产者花费了太多时间来完成发送消息。我已经分别运行以及一起运行。可能是什么问题呢? 我的代码是:

public  class HornetProducer implements Runnable{

    Context ic = null;
    ConnectionFactory cf = null;
    Connection connection =  null;
    Queue queue = null;
    Session session = null;
    MessageProducer publisher =  null;
    TextMessage message = null;
    int messageSent=0;

     public synchronized static Context getInitialContext()throws javax.naming.NamingException {

            Properties p = new Properties( );
            p.put(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");
            p.put(Context.URL_PKG_PREFIXES," org.jboss.naming:org.jnp.interfaces");
            p.put(Context.PROVIDER_URL, "jnp://localhosts:1099");

            return new javax.naming.InitialContext(p);
        }  

    public HornetProducer()throws Exception{            

        ic = getInitialContext();
        cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
        queue = (Queue)ic.lookup("queue/testQueue2");
        connection = cf.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        
        publisher = session.createProducer(queue);
        connection.start();

    }

    public void publish(){      
        try{        

            message = session.createTextMessage("Hello!");
            System.out.println("StartDate: "+new Date());

            for(int i=0;i<10000;i++){                   
                 messageSent++;              
                 publisher.send(message);                
            }
            System.out.println("EndDate: "+new Date());
        }catch(Exception e){
            System.out.println("Exception in Consume: "+ e.getMessage());
        }           
    }

    public void run(){
         publish();
    }

    public static void main(String[] args) throws Exception{

        new HornetProducer().publish();    
    }

}

【问题讨论】:

    标签: java jboss queue hornetq


    【解决方案1】:

    您正在持续发送这些消息,并且是非事务性的。这意味着,发送的每条消息都必须单独完成。

    这意味着对于您发送的每条消息,您必须与服务器进行一次网络往返,并等待它完成持久性,然后才能发送另一条消息。

    如果您在这种情况下有多个生产者,hornetq 会批量处理这两个生产者,您将节省大量时间。 (即服务器将批处理许多写入请求)。

    如果你想加快单个生产者的发送速度,你应该使用事务。

    例如:

    I - 将您的会话更改为已交易:

    session = connection.createSession(true, Session.SESSION_TRANSACTIONED); 
    

    II - 每 N 条消息提交一次:

       for(int i=0;i<10000;i++){                   
             messageSent++;              
             publisher.send(message);  
             if (messageSent % 1000 == 0) session.commit();              
        }
        session.commit();
    

    您还可以禁用持久消息的同步。 (异步发送)。

    【讨论】:

    • 感谢您的回复。那么哪种方法更好:Journal-sync to false 或按照您的建议进行每个会话批处理。假设只有一个会话并且所有生产者同步使用同一个会话?
    • 这一切都取决于您的需求。将 journal-sync 设置为 false 将使系统停止同步,如果发生故障,您最终可能会丢失数据。最好的方法是批量交易......在一次提交上发送多条消息。在一次提交上使用多条消息(ack)。
    • 这是否适用于嵌入式和核心 api(不使用 jms)?我没有连接对象,只有 ClientSessionFactory。我看到了一个方法 createTransactedSession(),这就是我要使用的方法吗?
    • 它确实......只需将这些内容转换为 CoreAPI,您就拥有相同的语义。
    猜你喜欢
    • 2011-11-09
    • 2011-02-21
    • 2015-11-21
    • 2023-04-05
    • 2015-05-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-26
    相关资源
    最近更新 更多