【问题标题】:Generating a reply when using an outbound channel adapter使用出站通道适配器时生成回复
【发布时间】:2015-06-26 06:33:11
【问题描述】:

我有以下简化的 spring 集成流程:

int-ws:inbound-gateway ----> int:transformer ----> int-kafka:outbound-channel-adapter

基本上:

  1. 使用 int-ws:inbound-gateway 公开 Web 服务端点
  2. 来自此端点的消息被放到input 频道(第一个--->
  3. 自定义转换器转换负载 JSON 格式并添加 MESSAGE_KEY 标头(kafka 必需)
  4. 消息被放置到inputToKafka频道(第二个--->
  5. int-kafka:outbound-channel-adapter 将消息推送到 kafka 主题

网络服务操作有一个请求和一个响应负载。
请求有效负载是我要转换为 JSON 消息的内容。
一旦消息由int-kafka:outbound-channel-adapter

放置在 kafka 主题上,我想返回一个响应负载(将被编组等)

我该怎么做?

目前,当我调用 Web 服务时,一切都按预期工作,但我必须在 int-ws:inbound-gateway 上设置一个 reply-timeout 以使其不会挂起。当我这样做时,我只是在 SOAPUI 上得到一个空响应。

我了解 Gateway behavior when no response arrives 部分中的概念 - 但就我而言,我确实想生成回复

这是我的集成上下文(没有 kafka 代理配置等):

<int-ws:inbound-gateway id="ws-inbound-gateway" request-channel="input"
                        marshaller="marshaller" unmarshaller="marshaller" reply-timeout="100"/>

<int:channel id="input"/>

<int:transformer input-channel="input" output-channel="inputToKafka" method="transform">
    <bean class="com.test.InputToJSONTransformer"/>
</int:transformer>

<int:channel id="inputToKafka"/>

<int-kafka:outbound-channel-adapter kafka-producer-context-ref="kafkaProducerContext"
                                    auto-startup="true"
                                    channel="inputToKafka"
                                    order="1">
</int-kafka:outbound-channel-adapter>

【问题讨论】:

    标签: spring spring-integration


    【解决方案1】:

    改变

    &lt;int:channel id="inputToKafka"/&gt;

    &lt;int:publish-subscribe-channel id="inputToKafka"/&gt;

    向频道添加第二个订阅者。

    &lt;service-activator input-channel="inputToKafka" ... order="2" /&gt;

    服务生成响应的位置;它会在成功发送到 kafka 后被调用。

    不要包含output-channel;该框架将负责将服务输出路由回 ws 网关。

    【讨论】:

    • 谢谢 - 它看起来好像它发送给第二个订阅者的响应无论发送到 Kafka 是否成功。是否有任何方法可以检测到向 kafka 发送不成功,并最好将错误消息返回给 WS?我猜不是,因为 kafka 调用包含在 Future 中,并且什么也没做。
    • 自从我们切换到 0.8.2 后,仅支持异步 kafka 发送。您需要暂停线程并设置消费者以确认消息到达 kafka,然后释放发送线程。
    【解决方案2】:

    您希望从 Kafka 出站通道适配器得到什么样的响应,记住它是 Adapter 而不是 Gateway

    但是,您可以引入另一个组件,ServiceActivator 使用输入通道 sendToInputToKafka 现在您的转换器将输出到该通道。

    您的“ServiceActivator”应该有一个@Autowire MessageChannel inputToKafka,并且应该以编程方式手动向该频道发送一条消息。 发送该消息后,您将构建所需的响应作为ServiceActivator 的返回类型和对您的ws gateway 的响应

    【讨论】:

    • 一般不需要在用户代码中注入框架组件(如MessageChannel),看我的回答。
    猜你喜欢
    • 2015-07-11
    • 2018-07-06
    • 2016-05-28
    • 1970-01-01
    • 2016-07-19
    • 1970-01-01
    • 2014-12-04
    • 1970-01-01
    • 2013-02-02
    相关资源
    最近更新 更多