【发布时间】:2014-10-24 04:10:49
【问题描述】:
使用 WSO2 ESB,我正在尝试将 AMQP 消息发送到 WSO2 消息代理。此 AMQP 消息应具有 JSON 内容类型。
我在 WSO2 ESB 中公开了一个代理服务,它获取消息、将其转换为 JSON 并将其发送到消息代理。代理服务配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse" name="json_sample" transports="http" statistics="disable" trace="disable" startOnLoad="true">
<target>
<inSequence>
<property name="OUT_ONLY" value="true" scope="default" type="STRING"/>
<property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
<property name="messageType" value="application/json" scope="axis2" type="STRING"/>
<log level="full"/>
<header name="To"
value="jms:/sample-queue?transport.jms.ConnectionFactoryJNDIName=sampleConnectionFactory&java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&java.naming.provider.url=repository/conf/jndi.properties&transport.jms.DestinationType=queue"/>
<send/>
</inSequence>
</target>
<publishWSDL>
<definitions name="JsonSample" xmlns="http://schemas.xmlsoap.org/wsdl/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/" xmlns:tns="http://www.examples.com/wsdl/JsonSample.wsdl" targetNamespace="http://www.examples.com/wsdl/JsonSample.wsdl">
<message name="JsonSampleRequest">
<part name="prop1" type="xsd:string"/>
<part name="prop2" type="xsd:string"/>
</message>
<portType name="JsonSample_PortType">
<operation name="jsonObject">
<input message="tns:JsonSampleRequest"/>
</operation>
</portType>
<binding name="JsonSample_Binding" type="tns:JsonSample_PortType">
<soap:binding style="rpc" transport="http://schemas.xmlsoap.org/soap/http"/>
<operation name="jsonObject">
<soap:operation soapAction="jsonObject"/>
<input>
<soap:body encodingStyle="http://schemas.xmlsoap.org/soap/encoding/" namespace="urn:examples:jsonsample" use="encoded"/>
</input>
</operation>
</binding>
<service name="JsonSample_Service">
<port binding="tns:JsonSample_Binding" name="JsonSample_Port">
<soap:address location="http://www.examples.com/JsonSample/"/>
</port>
</service>
</definitions>
</publishWSDL>
<description/>
</proxy>
此代理服务运行良好,将消息转换为 JSON,但不要将 AMQP 消息内容类型设置为 JSON。该消息被视为文本。
当我尝试使用此代码以编程方式在消息代理中注入 AMQP 消息时,我确实收到了 JSON 内容类型的消息:
import java.util.Properties;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import org.wso2.andes.client.message.JMSTextMessage;
public class StackOverflowSample {
private static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory.";
private static final String CF_NAME = "qpidConnectionfactory";
private static final String CARBON_CLIENT_ID = "carbon";
private static final String CARBON_VIRTUAL_HOST_NAME = "carbon";
private static final String host = "localhost";
private static final int port = 5675;
private static final String userName = "admin";
private static final String password = "admin";
private static final String queueName = "sample-queue";
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
InitialContext ctx = new InitialContext(properties);
// Lookup connection factory
QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
QueueConnection queueConnection = connFactory.createQueueConnection();
queueConnection.start();
QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
// Send message
Queue queue = queueSession.createQueue(queueName);
// create the message to send
JMSTextMessage textMessage = (JMSTextMessage) queueSession
.createTextMessage("{\"prop1\":\"value1\",\"prop2\":\"value2\"}");
textMessage.setContentType("application/json");
javax.jms.QueueSender queueSender = queueSession.createSender(queue);
queueSender.send(textMessage);
queueSender.close();
queueSession.close();
queueConnection.close();
}
private static String getTCPConnectionURL(String username, String password) {
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
return new StringBuffer().append("amqp://").append(username).append(":").append(password).append("@")
.append(CARBON_CLIENT_ID).append("/").append(CARBON_VIRTUAL_HOST_NAME).append("?brokerlist='tcp://")
.append(host).append(":").append(port).append("'").toString();
}
}
将 textMessage.setContentType("application/json"); 更改为 textMessage.setContentType("text/plain"); 与使用 ESB 的结果相同。
所以问题是:如何配置 ESB 以将 AMQP 消息内容类型设置为 JSON?
谢谢
【问题讨论】: