【问题标题】:Is there an API in spring-xd to write to a message bus?spring-xd 中是否有 API 可以写入消息总线?
【发布时间】:2015-01-29 22:53:20
【问题描述】:

考虑到任何消息总线都可以使用 spring-xd 部署,spring-xd 是否提供任何 API 可以根据 xd/config/servers.yml 中的配置写入已部署的消息总线,例如 Kafka/rabbitmq或任何其他这样的地方。

我正在编写一个处理器(itemprocessor),它对数据进行一些处理,编写器现在会将数据写入rabbitmq队列。因为,在当前的部署场景中,可能会部署rabbitmq,也可能不会部署,所以处理器应该能够写入默认的Redis消息总线。我知道我可以使用spring-rabbit提供的apis写入rabbitmq,但这会束缚我的处理器到 RabbitMQ。我正在寻找一种概括它的方法。我试图查看 spring-xd 代码,看看是否有这样的例子。我找到了一个 MessageProcessor 示例,但这是一个流处理器,不知道如何应用它或者我是否走在正确的轨道上。

https://github.com/spring-projects/spring-xd/blob/master/spring-xd-rxjava/src/test/java/org/springframework/xd/rxjava/PongMessageProcessor.java

我刚开始使用 spring-xd,如果已经讨论过,请原谅我的无知。任何指针都非常感谢。

更新

感谢 Gary,根据您的回答,我试用了 spring-integration jms 示例。

我有一个春季批处理作业

<batch:chunk reader="reader" processor="processor" writer="writer" /> 

我希望将编写器的输出写入任何底层消息总线,首先是 RabiitMQ。所以我根据我在示例中看到的内容添加了以下内容:

 <beans:bean id="writer" class="abc" scope="step">
</beans:bean>

<channel id="outputme"/>

<beans:bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <beans:constructor-arg value="queue.demo"/>
</beans:bean>

<beans:bean id="replyQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <beans:constructor-arg value="queue.reply"/>
</beans:bean>

<jms:outbound-gateway request-channel="outputme"
                      request-destination="requestQueue"
                      reply-channel="jmsReplyChannel"/>

<channel id="jmsReplyChannel" />

<beans:beans profile="default">

    <stream:stdout-channel-adapter channel="jmsReplyChannel" append-newline="true"/>

</beans:beans>

执行此操作时,我看到以下输出,这让我相信某些内容正在写入嵌入式 ActiveMQ 代理。

16:05:42,400 [AbstractApplicationContext] - Closing org.springframework.context.support.GenericApplicationContext@125a6d70: startup date [Tue Feb 03 16:05:40 PST 2015]; root of context hierarchy
16:05:42,401 [DefaultLifecycleProcessor$LifecycleGroup] - Stopping beans in phase 0
16:05:42,402 [EventDrivenConsumer] - Removing {jms:outbound-gateway} as a subscriber to the 'outputme' channel
16:05:42,402 [AbstractSubscribableChannel] - Channel 'org.springframework.context.support.GenericApplicationContext@125a6d70.outputme' has 0 subscriber(s).
16:05:42,402 [AbstractEndpoint] - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#0
16:05:42,402 [EventDrivenConsumer] - Removing {service-activator} as a subscriber to the 'jmsReplyChannel' channel
16:05:42,402 [AbstractSubscribableChannel] - Channel 'org.springframework.context.support.GenericApplicationContext@125a6d70.jmsReplyChannel' has 1 subscriber(s).
16:05:42,402 [AbstractEndpoint] - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#1
16:05:42,402 [EventDrivenConsumer] - Removing {stream:outbound-channel-adapter(character)} as a subscriber to the 'jmsReplyChannel' channel
16:05:42,403 [AbstractSubscribableChannel] - Channel 'org.springframework.context.support.GenericApplicationContext@125a6d70.jmsReplyChannel' has 0 subscriber(s).
16:05:42,403 [AbstractEndpoint] - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#2

但是,当我尝试通过像这样更改 connectionfactory 来使用 RabbitMQ 更改 ActiveMQ 时:

<rabbit:connection-factory id="connectionFactory" />

我收到一条错误消息:

 Cannot convert value of type [org.springframework.amqp.rabbit.connection.CachingConnectionFactory] to required type [javax.jms.ConnectionFactory] for property 'connectionFactory'

根据http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd 的架构文件中提到的内容,我理解这一点,因为元素连接工厂所需的类型是 javax.jms.ConnectionFactory。我环顾四周,没有找到为 RabbitMQ 创建 conenctionFactory 的方法,就像我们为 ActiveMQ 创建 ConnectionFactory 一样。

我环顾四周,不知道如何解决这个问题。也许我错过了一些非常基本的东西。我什至不确定这是否是正确的方法。您能否让我知道我缺少什么以及这是否是正确的方法?如果已经讨论过,我提前道歉。

再次感谢您的宝贵时间。


非常感谢您的宝贵时间。

问候,

爱丽丝

【问题讨论】:

    标签: spring-integration spring-xd


    【解决方案1】:

    MessageBus SPI 非常适用于 XD 模块内通信;它不是为任意应用程序级消息传递而设计的。

    也就是说,XD(及其消息总线实现)广泛使用Spring Integration project

    该项目提供了您需要的抽象。您可以发送到消息通道(使用MessagingGatewayMessagingTemplate,并且在该通道的下游,您可以连接任何类型的通道适配器(rabbit [amqp]、redis 等)。

    因此,您的项目处理器与接收消息的实际技术分离。

    查看 Spring Integration 参考手册(项目页面上有链接)。

    【讨论】:

    • 谢谢加里,我试过这个。不用说,遇到了很多新手问题。我用我的发现更新了我的问题。再次感谢您的宝贵时间。
    • RabbitMQ 不是 JMS 提供者;这是一个完全不同的协议。 Pivotal 确实有一个用于rabbitmq 的商业JMS 客户端,但是如果您想使用本机rabbitmq (amqp),则需要将int-jms:outbound-gateway 更改为int-amqp:outbound-gateway。见the spring integration documentation
    • 谢谢加里,这有帮助。我添加了以下rabbitmq配置并将命名交换添加到出站网关。这应该使所有消息都出现在 RabbitMQ 中。我还没有看到它,可能是因为我正在作为测试运行。我将尝试作为实际应用程序运行。 ... ... '
    • 您需要&lt;rabbit:admin/&gt; 来声明队列等(或在代理中手动设置)。
    • 感谢加里的帮助。我确实有 rabbit:admin,但是我通过调用处理器两次(对于 2 个单独的执行路径)在配置中搞砸了一些事情。我只是把它分开,它开始像魅力一样工作。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-07-06
    • 1970-01-01
    • 2021-06-16
    • 2019-02-17
    • 1970-01-01
    相关资源
    最近更新 更多