【问题标题】:无法在 Spring 集成中将订阅者附加到 BroadCastingDispatcher
【发布时间】:2020-08-30 09:40:44
【问题描述】:

按照Bootiful GCP: Integration with Google Cloud Pub/Sub (4/8) 的这个示例,我试图构建一个流程,从 Google Pubsub 订阅中读取数据并写入另一个主题。

DEBUG 模式下启动我的应用程序后,我可以看到消息是从Google PubSub 到达的,但因此它们没有被“消耗”

o.s.i.dispatcher.BroadcastingDispatcher : 没有订阅者,默认行为是忽略

非常感谢您对此的任何帮助。

以下是我的主要代码的样子-

public class PubsubRouteBuilderService {

    private final PubSubTemplate pubSubTemplate; // injected via Spring

    public PubsubRouteBuilderService(PubSubTemplate pubSubTemplate) {
        this.pubSubTemplate = pubSubTemplate;
    }

    public synchronized boolean buildRoute(PubsubRouteModel pubsubRouteModel) {
        log.info("Building route for: {}", pubsubRouteModel);
        buildPubsubRoute(pubsubRouteModel);
        // some unrelated logic
        return true;
    }

    private void buildPubsubRoute(PubsubRouteModel pubsubRouteModel) {

        final StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(
            RouteBuilderFactory
                    .messageChannelAdapter(
                            RouteBuilderFactory.getMessageChannel(),
                            pubSubTemplate,
                            pubsubRouteModel.getFromSub()))
            .handle(
                    message -> {
                        log.info("consumed new message: [" + message.getPayload() + "]");
                        AckReplyConsumer consumer = message.getHeaders()
                                .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, AckReplyConsumer.class);
                        consumer.ack();
                    })
            .get();

        standardIntegrationFlow.start();
    }
}

下面是RouteBuilderFactory的其他方法-

public static MessageChannel getMessageChannel() {
    return MessageChannels.publishSubscribe().get();
}

public static PubSubInboundChannelAdapter messageChannelAdapter(MessageChannel inputChannel, PubSubTemplate pubSubTemplate, String channelName) {
    PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, channelName);
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);
    return adapter;
}

【问题讨论】:

    标签: java-8 spring-integration spring-integration-dsl spring-cloud-gcp


    【解决方案1】:

    您的代码似乎根本不是基于该博客文章...

    private void buildPubsubRoute(PubsubRouteModel pubsubRouteModel) {
    
        final StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(
            RouteBuilderFactory
                    .messageChannelAdapter(
                            RouteBuilderFactory.getMessageChannel(),
                            pubSubTemplate,
                            pubsubRouteModel.getFromSub()))
            .handle(
                    message -> {
                        log.info("consumed new message: [" + message.getPayload() + "]");
                        AckReplyConsumer consumer = message.getHeaders()
                                .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, AckReplyConsumer.class);
                        consumer.ack();
                    })
            .get();
    
        standardIntegrationFlow.start();
    }
    

    您不能只是“启动”一些任意的 IntegrationFlow 对象 - 它必须由 Spring 管理(声明为 @Bean)。

    框架在幕后构建了一些基础架构来完成所有这些工作。

    【讨论】:

    • 我想在不由 Spring 管理的情况下让它工作,因为我的用例是我必须动态启动和关闭路由(在应用程序运行时非常好)。如果没有spring-integration,我会更好吗?
    • 我们在 Spring Integration 中不称它们为“路由”;它们是流。对于动态流注册,请参阅the documentation
    • 看完文档,简直不敢相信!
    猜你喜欢
    • 1970-01-01
    • 2019-10-04
    • 2012-10-06
    • 2018-08-02
    • 2016-01-12
    • 1970-01-01
    • 2017-05-31
    • 2013-08-16
    • 1970-01-01
    相关资源
    最近更新 更多