【问题标题】:Apache Camel - Special characters in message bodyApache Camel - 消息正文中的特殊字符
【发布时间】:2020-03-31 00:41:54
【问题描述】:

我正在使用 Camel 3.1 版并尝试使用 AMQP Component 和 Spring 引导将消息从一个 ActiveMQ 服务器发送到另一个服务器。发送消息后,在目标 ActiveMQ 控制台上,消息详细信息具有以下内容。

Sp�ASr�)�x-opt-jms-destQ�x-opt-jms-msg-typeQSs�^
�/ID:53e1ce3a-4drf-4f8a-9ff9-845fe0d0006e:3:1:1-1@�queue://testcamelwithamqp@@@@@@�qd�Sw�1:"test message"

我的实际消息是1:"test message",但不知何故,JMS 标头作为特殊字符放入消息正文中。有什么解决办法吗?

下面是spring boot代码示例

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableAutoConfiguration
@SpringBootApplication
public class CamelMQApplication {

    public static void main(String[] args) {
        SpringApplication.run(CamelMQApplication.class, args);
    }
}
import org.apache.camel.CamelContext;
import org.apache.camel.component.amqp.AMQPComponent;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AmqpConfigInternal {

    @Value("${INTERNAL_SERVICE_USERNAME}")
    private String userName;
    @Value("${INTERNAL_SERVICE_PASSWORD}")
    private String pass;
    @Value("${INTERNAL_REMOTE_URI}")
    private String remoteUri;

    @Autowired
    private CamelContext camelInternal;

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPass() {
        return pass;
    }

    public void setPass(String pass) {
        this.pass = pass;
    }

    public String getRemoteUri() {
        return remoteUri;
    }

    public void setRemoteUri(String remoteUri) {
        this.remoteUri = remoteUri;
    }

    private JmsConnectionFactory internalConnectionFactory() throws Exception {
        JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory();
        jmsConnectionFactory.setRemoteURI(remoteUri);
        jmsConnectionFactory.setUsername(userName);
        jmsConnectionFactory.setPassword(pass);
        return jmsConnectionFactory;
    }

    @Bean
    public AMQPComponent internalAmqpConnection() throws Exception {
        AMQPComponent amqp = new AMQPComponent();
        amqp.setConnectionFactory(internalConnectionFactory());
        camelInternal.addComponent("amqpInternal", amqp);
        return amqp;
    }

}

import org.apache.camel.ExchangePattern;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
public class SampleAutowiredAmqpRouteTest extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("{{route1.from}}")
        .convertBodyTo(String.class, "UTF-8")
        .removeHeaders("*")
        .log("From ActiveMQ: ${body}")
        .to("{{route1.to}}")
        .convertBodyTo(String.class, "UTF-8")
        .removeHeaders("*");

    }

}
import org.apache.camel.CamelContext;
import org.apache.camel.component.amqp.AMQPComponent;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AmqpConfigRouteTest {

    @Value("${ACTIVEMQ_SERVICE_USERNAME}")
    private String userName;
    @Value("${ACTIVEMQ_SERVICE_PASSWORD}")
    private String pass;
    @Value("${ACTIVEMQ_REMOTE_URI}")
    private String remoteUri;

    @Autowired private CamelContext camelRouteTest;

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPass() {
        return pass;
    }

    public void setPass(String pass) {
        this.pass = pass;
    }

    public String getRemoteUri() {
        return remoteUri;
    }

    public void setRemoteUri(String remoteUri) {
        this.remoteUri = remoteUri;
    }

    private JmsConnectionFactory amqp1ConnectionFactory() throws Exception {
        JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory();
        jmsConnectionFactory.setRemoteURI(remoteUri);
        jmsConnectionFactory.setUsername(userName);
        jmsConnectionFactory.setPassword(pass);
        return jmsConnectionFactory;
    }

    @Bean
    public AMQPComponent amqp1Connection() throws Exception {
        AMQPComponent amqp = new AMQPComponent();
        amqp.setConnectionFactory(amqp1ConnectionFactory());
        camelRouteTest.addComponent("amqpRouteTest", amqp);
        return amqp;
    }
}
application.properties

server.port=8071
camel.springboot.name = CamelTest
camel.springboot.main-run-controller = true

INTERNAL_REMOTE_URI=amqp://actimqserver1:12345
INTERNAL_SERVICE_USERNAME=admin
INTERNAL_SERVICE_PASSWORD=admin

ACTIVEMQ_REMOTE_URI=amqp://actimqserver2:12345
ACTIVEMQ_SERVICE_USERNAME=admin
ACTIVEMQ_SERVICE_PASSWORD=admin

route1.from = amqpInternal:test4camelamqpSrcQ
route1.to = amqpRouteTest:test4camelamqpTgtQ


【问题讨论】:

  • 如果您实际使用消息而不是查看控制台中的内容会怎样?可能数据已损坏,或者控制台的显示逻辑可能存在错误。
  • 您能否提供更多信息,例如您正在使用的代码?
  • @JustinBertram,我尝试使用 @JmsListener 获取消息,并且在 jms 消息输出中看到相同的 spl 字符。

标签: apache-camel activemq


【解决方案1】:

解决方法:我们需要在ActiveMQ的配置文件中添加transport.transformer="jms",如下图,然后重启Active MQ

<transportConnector name="amqp" uri="amqp://localhost:5672?**transport.transformer=jms**"/>

这实际上会让 ActiveMQ 正确映射传入的 JMS 消息头。 'transport.transformer=native' 的默认值将 AMQP 消息包装到 JMS BytesMessage 中,因此它既没有正确显示在控制台中,也没有在客户端实际使用消息时显示。

【讨论】:

    【解决方案2】:

    您正在使用 ActiveMQ 和 AMQP 协议。你也有这个协议的configured the broker 吗?

    例如传输连接器:

    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672"/>
    

    说实话我不知道当你使用普通的 ActiveMQ 代理(默认协议是 OpenWire)并向它发送 AMQP 消息时会发生什么

    在您的情况下,Camel 似乎能够正确地在队列中生成和使用消息。这并不奇怪,因为生产者和消费者都针对 AMQP 进行了配置。代理只存储消息。

    另一方面,您写道 Springs JmsListener 会在消费时弄乱消息。可能是因为它不需要 AMQP 消息,而您尝试接收 JMS 消息(TextMessage、ByteMessage 等)。

    您可以尝试简单地接收 Message&lt;?&gt;,它是 Spring 对多种传输的抽象

    消息在 ActiveMQ 控制台中被弄乱并不奇怪。我猜 AMQP 消息具有特定格式,控制台无法正确显示,因为它与 JMS 消息不同。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-01-30
      • 2018-03-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多