【问题标题】:Get Hawtio to show more than 255 chars in JMS messages让 Hawtio 在 JMS 消息中显示超过 255 个字符
【发布时间】:2015-09-01 17:55:29
【问题描述】:

我正在使用 Hawtio 浏览我的 ActiveMQ 队列。我还希望能够在将 JMS 消息重新发送到另一个队列之前对其进行编辑。

我不知道如何在 Hawtio 中编辑消息,但没关系,我想直接在代理中修改消息是不合法的。

相反,我会复制邮件正文并发送修改正文的新邮件。现在,我面临的问题是我只能看到消息正文的前 255 个字符。如何在 hawtio 中查看整个 ActiveMQ 消息?不仅仅是前 255 个字符。

【问题讨论】:

    标签: jms apache-camel activemq hawtio


    【解决方案1】:

    Hawtio 使用 JMX 接口浏览队列。它调用队列上的browse() 方法。它将消息返回为CompositedData[]

    ActiveMQBytesMessage 被转换(检查类org.apache.activemq.broker.jmx.OpenTypeSupport.ByteMessageOpenTypeFactory)时,会添加两个字段BodyLengthBodyPreview。这些字段返回以下数据。

    • BodyLength - JMS 消息体的长度
    • BodyPreview - JMS 消息正文的前 255 个字节(硬编码的长度,正如 Claus Ibsen 在他的回答中已经说过的那样;-))

    签入类org.apache.activemq.broker.jmx.OpenTypeSupport.ByteMessageOpenTypeFactory方法Map<String, Object> getFields(Object o)

    Hawtio 使用字段BodyPreview 显示消息,用于非文本消息。

    签入 Hawtio 文件hawtio-web/src/main/webapp/app/activemq/js/browse.ts

    function createBodyText(message) {
        if (message.Text) {
            ...
        } else if (message.BodyPreview) {
            ...
            if (code === 1 || code === 2) {
                // bytes and text
                var len = message.BodyPreview.length;
                var lenTxt = "" + textArr.length;
                body = "bytes:\n" + bytesData + "\n\ntext:\n" + textData;
                message.textMode = "bytes (" + len + " bytes) and text (" + lenTxt + " chars)";
            } else {
                // bytes only
                var len = message.BodyPreview.length;
                body = bytesData;
                message.textMode = "bytes (" + len + " bytes)";
            }
            ...
        } else {
            message.textMode = "unsupported";
            ...
    

    如果您想更改它,您必须在ActiveMQHawtio 中更改它。

    一个冗长的例子来说明解释。

    import static java.lang.System.out;
    import java.lang.management.ManagementFactory;
    import java.util.Enumeration;
    import java.util.concurrent.TimeUnit;
    import javax.jms.BytesMessage;
    import javax.jms.Connection;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.QueueBrowser;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.management.MBeanServer;
    import javax.management.MBeanServerInvocationHandler;
    import javax.management.ObjectName;
    import javax.management.openmbean.CompositeData;
    import javax.management.openmbean.CompositeType;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.broker.BrokerFactory;
    import org.apache.activemq.broker.BrokerService;
    import org.apache.activemq.broker.jmx.QueueViewMBean;
    import org.apache.activemq.command.ActiveMQBytesMessage;
    import org.apache.activemq.command.ActiveMQTextMessage;
    
    public class BodyPreviewExample {
    
        public static void main(String[] args) throws Exception {
            String password = "password";
            String user = "user";
            String queueName = "TEST_QUEUE";
            String brokerUrl = "tcp://localhost:61616";
    
            BrokerService broker = BrokerFactory.createBroker("broker:"+brokerUrl);
            broker.start();
            broker.waitUntilStarted();
    
            Connection conn = new ActiveMQConnectionFactory(brokerUrl)
                    .createConnection(user, password);
            conn.start();
    
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue producerQueue = session.createQueue(queueName);
            MessageProducer producer = session.createProducer(producerQueue);
    
            // create a dummy message
            StringBuilder sb = new StringBuilder(1000);
            for (int i = 0; i < 100; i++) {
                sb.append(">CAFEBABE<");
            }
    
            // create and send a JMSBytesMessage
            BytesMessage bytesMsg = session.createBytesMessage();
            bytesMsg.writeBytes(sb.toString().getBytes());
            producer.send(bytesMsg);
    
            // create and send a JMSTextMessage
            TextMessage textMsg = session.createTextMessage();
            textMsg.setText(sb.toString());
            producer.send(textMsg);
    
            producer.close();
    
            out.printf("%nmessage info via session browser%n");
            String format = "%-20s = %s%n";
            Queue consumerQueue = session.createQueue(queueName);
            QueueBrowser browser = session.createBrowser(consumerQueue);
            for (Enumeration p = browser.getEnumeration(); p.hasMoreElements();) {
                out.println();
                Object next = p.nextElement();
                if (next instanceof ActiveMQBytesMessage) {
                    ActiveMQBytesMessage amq = (ActiveMQBytesMessage) next;
                    out.printf(format, "JMSMessageID", amq.getJMSMessageID());
                    out.printf(format, "JMSDestination", amq.getJMSDestination());
                    out.printf(format, "JMSXMimeType", amq.getJMSXMimeType());
                    out.printf(format, "BodyLength", amq.getBodyLength());
                } else if (next instanceof ActiveMQTextMessage) {
                    ActiveMQTextMessage amq = (ActiveMQTextMessage) next;
                    out.printf(format, "JMSMessageID", amq.getJMSMessageID());
                    out.printf(format, "JMSDestination", amq.getJMSDestination());
                    out.printf(format, "JMSXMimeType", amq.getJMSXMimeType());
                    out.printf(format, "text.length", amq.getText().length());
                } else {
                    out.printf("unhandled message type: %s%n", next.getClass());
                }
            }
            session.close();
            conn.close();
    
            // access the queue via JMX
            out.printf("%nmessage info via JMX browse operation%n");
            MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
            ObjectName name = new ObjectName("org.apache.activemq:type=Broker"
                    + ",brokerName=localhost"
                    + ",destinationType=Queue"
                    + ",destinationName=" + queueName);
            QueueViewMBean queue
                    = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
                            name, QueueViewMBean.class, true);
            CompositeData[] browse = queue.browse();
            for (CompositeData compositeData : browse) {
                out.println();
                CompositeType compositeType = compositeData.getCompositeType();
                out.printf(format, "CompositeType", compositeType.getTypeName());
                out.printf(format,"JMSMessageID",compositeData.get("JMSMessageID"));
                if (compositeData.containsKey("BodyLength")) {
                    // body length of the ActiveMQBytesMessage
                    Long bodyLength = (Long) compositeData.get("BodyLength");
                    out.printf(format, "BodyLength", bodyLength);
                    // the content displayed by hawtio
                    Byte[] bodyPreview = (Byte[]) compositeData.get("BodyPreview");
                    out.printf(format, "size of BodyPreview", bodyPreview.length);
                } else if (compositeData.containsKey("Text")) {
                    String text = (String) compositeData.get("Text");
                    out.printf(format, "Text.length()", text.length());
                }
            }
            // uncomment if you want to check with e.g. JConsole
            // TimeUnit.MINUTES.sleep(5);
            broker.stop();
        }
    }
    

    示例输出

    message info via session browser
    
    JMSMessageID         = ID:hostname-50075-1467979678722-3:1:1:1:1
    JMSDestination       = queue://TEST_QUEUE
    JMSXMimeType         = jms/bytes-message
    BodyLength           = 1000
    
    JMSMessageID         = ID:hostname-50075-1467979678722-3:1:1:1:2
    JMSDestination       = queue://TEST_QUEUE
    JMSXMimeType         = jms/text-message
    text.length          = 1000
    
    message info via JMX browse operation
    
    CompositeType        = org.apache.activemq.command.ActiveMQBytesMessage
    JMSMessageID         = ID:hostname-50075-1467979678722-3:1:1:1:1
    BodyLength           = 1000
    size of BodyPreview  = 255
    
    CompositeType        = org.apache.activemq.command.ActiveMQTextMessage
    JMSMessageID         = ID:hostname-50075-1467979678722-3:1:1:1:2
    Text.length()        = 1000
    

    【讨论】:

    • @Solo How do you implement it in a Java code? 是什么意思?该示例使用 Java。
    【解决方案2】:

    我认为当您使用 hawtio 使用的 JMX API 查询和浏览队列时,ActiveMQ 存在硬编码限制。但不记得它是否只有 255 个字节。

    查看hawtio设置,可能有一个ActiveMQ插件设置可以更改255个字符,也不记得了;)

    【讨论】:

    • 我在 hawtio 中找不到任何将输出限制为 255 的设置。我想让我的用户能够从 ActiveMQ 队列中手动复制整个消息。你建议我用什么工具?
    猜你喜欢
    • 2015-10-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-05-15
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多