【问题标题】:Spring Integration - Async service activator for SQS messagesSpring Integration - SQS 消息的异步服务激活器
【发布时间】:2020-05-18 09:46:11
【问题描述】:

我正在尝试使用 void 异步服务激活器为 sqs 队列实现集成流,但从未触发处理逻辑。 消息在流程中收到,由我的自定义转换器成功转换,但异步处理从未完成。

这是我的配置类:

@Configuration
public class SqsConfiguration {
    /**
     ...
     ...
    **/


    @Bean("amazonSQSClientConfiguration")
    ClientConfiguration getAmazonSQSClientConfiguration() {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setConnectionTimeout(connectionTimeout);
        clientConfiguration.setMaxConnections(maxConnections);
        clientConfiguration.setSocketTimeout(socketTimeout);
        clientConfiguration.setMaxConsecutiveRetriesBeforeThrottling(maxConsecutiveRetriesBeforeThrottling);
        return clientConfiguration;
    }

    @Bean("amazonSQSAsync")
    AmazonSQSAsync getAmazonSQSAsync() {
        return AmazonSQSAsyncClientBuilder.standard()
                .withClientConfiguration(getAmazonSQSClientConfiguration())
                .withRegion(this.region)
                .build();
    }

    @Bean("amazonSQSRequestListenerContainerConsumerPool")
    protected ThreadPoolTaskExecutor amazonSQSRequestListenerContainerConsumerPool() {
        int maxSize = (int) Math.round(concurrentHandlers * poolSizeFactor);
        int queueCapacity = (int) Math.round(concurrentHandlers * poolQueueSizeFactor);
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(concurrentHandlers);
        taskExecutor.setMaxPoolSize(maxSize);
        taskExecutor.setKeepAliveSeconds(poolKeepAliveTimeSeconds);
        taskExecutor.setQueueCapacity(queueCapacity);
        taskExecutor.setThreadFactory(new NamedDaemonThreadFactory("AmazonSQSRequestHandler"));
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        log.info(
                String.format(
                        "Amazon SQS request handler pool settings: {coreSize: %d, maxSize: %d, queueCapacity: %d}",
                        concurrentHandlers,
                        maxSize,
                        queueCapacity
                )
        );
        return taskExecutor;
    }

    @Bean("sqsMessageDrivenChannelAdapter")
    public MessageProducerSupport sqsMessageDrivenChannelAdapter() {
        SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(getAmazonSQSAsync(), this.queueName);
        adapter.setMaxNumberOfMessages(this.maxNumberOfMessages);
        adapter.setVisibilityTimeout(this.visibilityTimeout);
        adapter.setSendTimeout(this.sendTimeout);
        adapter.setWaitTimeOut(this.waitTimeOut);
        adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);
        adapter.setTaskExecutor(amazonSQSRequestListenerContainerConsumerPool());
        return adapter;
    }


    @Bean
    @SuppressWarnings("unchecked")
    IntegrationFlow sqsRequestIntegrationFlow() {
        SqsEventHandlerDispatcher commandHandler = applicationContext.getBean(SqsEventHandlerDispatcher.class);
        return IntegrationFlows.from(sqsMessageDrivenChannelAdapter())
                .transform(converter::toEvent)
                .log()
                .handle(commandHandler, "handle", a -> a.async(true))
                .log()
                .get();
    }
}

这是我的处理程序:

@Slf4j
@Component
@MessageEndpoint
public class SqsEventHandlerDispatcher {
    /**
     ...
     ...
    **/


    public ListenableFuture<?> handle(EventMessage event) {

        return new ListenableFutureTask<Void>(() -> doHandle(event), null);
    }

    private void doHandle(EventMessage event) {
         //my handling logic 

    }
}

永远无法达到doHandle() 方法中的逻辑。

与将返回 void 的同步处理程序的相同集成流程完美运行:

 @Bean
    @SuppressWarnings("unchecked")
    IntegrationFlow sqsRequestIntegrationFlow() {
        SqsEventHandlerDispatcher commandHandler = applicationContext.getBean(SqsEventHandlerDispatcher.class);
        return IntegrationFlows.from(sqsMessageDrivenChannelAdapter())
                .transform(converter::toEvent)
                .log()
                .handle(commandHandler, "handle")
                .log()
                .get();
    }

===============================================================================
@Slf4j
@Component
@MessageEndpoint
public class SqsEventHandlerDispatcher {

public void handle(EventMessage event) {
         //my handling logic
    }

}

我错过了什么吗?或者我可以通过使用 Mono 来实现它吗? 我在 Spring 集成和异步处理方面都没有太多经验。

【问题讨论】:

    标签: java asynchronous spring-integration spring-integration-aws


    【解决方案1】:

    我找到了一个使用反应式 Java 的解决方案。 这是我的服务激活器现在的样子:

     public Mono handle(EventMessage event, @Header(AwsHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
            return Mono.fromRunnable(() -> doHandle(event)).subscribeOn(Schedulers.elastic())
                    .doOnSuccess(r -> {
                        log.trace("Message successfully processed. Will delete it now!");
                        acknowledgment.acknowledge();
                    });
        }
    
        private void doHandle(EventMessage event) {
           //my handling logic
        }
    

    我还将 sqs 消息删除策略更新为 NEVER,并在消息成功处理并可以删除时手动确认。

    adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NEVER);
    

    【讨论】:

    • 永远不要像这样使用ListenableFutureTask,但很高兴你找到了解决方案!
    猜你喜欢
    • 2012-07-22
    • 2018-01-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-12-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多