【发布时间】:2015-01-29 22:53:20
【问题描述】:
考虑到任何消息总线都可以使用 spring-xd 部署,spring-xd 是否提供任何 API 可以根据 xd/config/servers.yml 中的配置写入已部署的消息总线,例如 Kafka/rabbitmq或任何其他这样的地方。
我正在编写一个处理器(itemprocessor),它对数据进行一些处理,编写器现在会将数据写入rabbitmq队列。因为,在当前的部署场景中,可能会部署rabbitmq,也可能不会部署,所以处理器应该能够写入默认的Redis消息总线。我知道我可以使用spring-rabbit提供的apis写入rabbitmq,但这会束缚我的处理器到 RabbitMQ。我正在寻找一种概括它的方法。我试图查看 spring-xd 代码,看看是否有这样的例子。我找到了一个 MessageProcessor 示例,但这是一个流处理器,不知道如何应用它或者我是否走在正确的轨道上。
我刚开始使用 spring-xd,如果已经讨论过,请原谅我的无知。任何指针都非常感谢。
更新
感谢 Gary,根据您的回答,我试用了 spring-integration jms 示例。
我有一个春季批处理作业
<batch:chunk reader="reader" processor="processor" writer="writer" />
我希望将编写器的输出写入任何底层消息总线,首先是 RabiitMQ。所以我根据我在示例中看到的内容添加了以下内容:
<beans:bean id="writer" class="abc" scope="step">
</beans:bean>
<channel id="outputme"/>
<beans:bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
<beans:constructor-arg value="queue.demo"/>
</beans:bean>
<beans:bean id="replyQueue" class="org.apache.activemq.command.ActiveMQQueue">
<beans:constructor-arg value="queue.reply"/>
</beans:bean>
<jms:outbound-gateway request-channel="outputme"
request-destination="requestQueue"
reply-channel="jmsReplyChannel"/>
<channel id="jmsReplyChannel" />
<beans:beans profile="default">
<stream:stdout-channel-adapter channel="jmsReplyChannel" append-newline="true"/>
</beans:beans>
执行此操作时,我看到以下输出,这让我相信某些内容正在写入嵌入式 ActiveMQ 代理。
16:05:42,400 [AbstractApplicationContext] - Closing org.springframework.context.support.GenericApplicationContext@125a6d70: startup date [Tue Feb 03 16:05:40 PST 2015]; root of context hierarchy
16:05:42,401 [DefaultLifecycleProcessor$LifecycleGroup] - Stopping beans in phase 0
16:05:42,402 [EventDrivenConsumer] - Removing {jms:outbound-gateway} as a subscriber to the 'outputme' channel
16:05:42,402 [AbstractSubscribableChannel] - Channel 'org.springframework.context.support.GenericApplicationContext@125a6d70.outputme' has 0 subscriber(s).
16:05:42,402 [AbstractEndpoint] - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#0
16:05:42,402 [EventDrivenConsumer] - Removing {service-activator} as a subscriber to the 'jmsReplyChannel' channel
16:05:42,402 [AbstractSubscribableChannel] - Channel 'org.springframework.context.support.GenericApplicationContext@125a6d70.jmsReplyChannel' has 1 subscriber(s).
16:05:42,402 [AbstractEndpoint] - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#1
16:05:42,402 [EventDrivenConsumer] - Removing {stream:outbound-channel-adapter(character)} as a subscriber to the 'jmsReplyChannel' channel
16:05:42,403 [AbstractSubscribableChannel] - Channel 'org.springframework.context.support.GenericApplicationContext@125a6d70.jmsReplyChannel' has 0 subscriber(s).
16:05:42,403 [AbstractEndpoint] - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#2
但是,当我尝试通过像这样更改 connectionfactory 来使用 RabbitMQ 更改 ActiveMQ 时:
<rabbit:connection-factory id="connectionFactory" />
我收到一条错误消息:
Cannot convert value of type [org.springframework.amqp.rabbit.connection.CachingConnectionFactory] to required type [javax.jms.ConnectionFactory] for property 'connectionFactory'
根据http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd 的架构文件中提到的内容,我理解这一点,因为元素连接工厂所需的类型是 javax.jms.ConnectionFactory。我环顾四周,没有找到为 RabbitMQ 创建 conenctionFactory 的方法,就像我们为 ActiveMQ 创建 ConnectionFactory 一样。
我环顾四周,不知道如何解决这个问题。也许我错过了一些非常基本的东西。我什至不确定这是否是正确的方法。您能否让我知道我缺少什么以及这是否是正确的方法?如果已经讨论过,我提前道歉。
再次感谢您的宝贵时间。
非常感谢您的宝贵时间。
问候,
爱丽丝
【问题讨论】:
标签: spring-integration spring-xd