【问题标题】:Timeout waiting for connection from pool while polling S3 for Objects为对象轮询 S3 时等待来自池的连接超时
【发布时间】:2019-01-05 21:06:27
【问题描述】:

我正在开发一个后端服务,该服务使用 spring aws 集成定期轮询 S3 存储桶并处理来自 S3 的轮询对象。下面是它的实现

@Configuration
@EnableIntegration
@IntegrationComponentScan
@EnableAsync
public class S3PollerConfiguration {

    //private static final Logger log = (Logger) LoggerFactory.getLogger(S3PollerConfiguration.class);

    @Value("${amazonProperties.bucketName}")
    private String bucketName;

    @Bean
    @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "5"))
    public MessageSource<InputStream> s3InboundStreamingMessageSource() {    
        S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
        messageSource.setRemoteDirectory(bucketName);   
        return messageSource;
    }

    @Bean
    public S3RemoteFileTemplate template() {
        return new S3RemoteFileTemplate(new S3SessionFactory(thumbnailGeneratorService.getImagesS3Client()));
    }

    @Bean
    public PollableChannel s3FilesChannel() {
        return new QueueChannel();
    }

    @Bean
    IntegrationFlow fileReadingFlow() throws IOException {
        return IntegrationFlows
                .from(s3InboundStreamingMessageSource(),
                        e -> e.poller(p -> p.fixedDelay(10, TimeUnit.SECONDS)))
                .handle(Message.class, (payload, header) -> processS3Object(payload.getHeaders(), payload.getPayload()))
                .get();
    }
}

我在对象上传时从 S3 获取消息,并且我能够使用作为消息有效负载的一部分接收的输入流来处理它。但我在这里面临的问题是,在收到几条消息后,我得到了“等待来自池的连接超时”异常

2019-01-06 02:19:06.156 ERROR 11322 --- [ask-scheduler-5] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:445)
    at org.springframework.integration.file.remote.RemoteFileTemplate.list(RemoteFileTemplate.java:405)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.listFiles(AbstractRemoteFileStreamingMessageSource.java:194)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.poll(AbstractRemoteFileStreamingMessageSource.java:180)
    at org.springframework.integration.aws.inbound.S3StreamingMessageSource.poll(S3StreamingMessageSource.java:70)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:153)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:155)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:236)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:250)

我知道该问题与未关闭已打开的 S3Object (如此处 https://github.com/aws/aws-sdk-java/issues/1405 所述)有关,因此我已实现关闭作为消息有效负载的一部分接收的 S3Object 的输入流。但这并不能解决问题,我不断收到异常。有人可以帮我解决这个问题吗?

【问题讨论】:

  • 你能分享一下你是如何关闭这些流的吗?
  • public MessageHandler processS3Object(MessageHeaders headers, Object obj) { InputStream is = (InputStream) obj; // 使用 is 进行处理; is.close(); }
  • @slimane :感谢您提供帮助。我只是在处理流后关闭输入流。我想我需要关闭 S3Object 但不知道如何在流式消息中访问它
  • 尝试从 s3InboundStreamingMessageSource() 中删除 @Bean @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "5")),因为您将在你的上下文
  • @slimane :这是问题的原因吗?

标签: java amazon-s3 streaming spring-integration spring-integration-aws


【解决方案1】:

您仍然在配置中将消息注释声明与 Java DSL 混合使用的问题。

看起来在fileReadingFlow 中,您在代码processS3Object() 方法中关闭了那些InputStreams,但您对InputStreams 产生的@InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "5")) 无能为力。 为什么你把它放在首位?如果您不使用该代码,是什么让您保留它?

S3StreamingMessageSource 一直被轮询两次:@InboundChannelAdapterIntegrationFlows.from()

您只需从 S3StreamingMessageSource bean 定义中删除 @InboundChannelAdapter 即可。

请阅读更多参考手册以确定此类注释的原因以及在使用 Java DSL 时如何不需要它:

https://docs.spring.io/spring-integration/reference/html/configuration.html#_using_the_literal_inboundchanneladapter_literal_annotation

https://docs.spring.io/spring-integration/reference/html/java-dsl.html#java-dsl-inbound-adapters

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-10-14
    • 1970-01-01
    • 2017-01-04
    • 2011-11-08
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多