【问题标题】:Spring Batch - FlatFileItemWriter Error 14416: Stream is already closedSpring Batch - FlatFileItemWriter 错误 14416:流已关闭
【发布时间】:2021-06-02 20:35:58
【问题描述】:

基本上我有一个 Spring Batch,它查询数据库并实现 Partitioner 以获取作业,并将作业分配给 SlaveStep 中的 ThreadPoolTask​​Executors。

Reader 从数据库中读取(作业)。 Writer 将数据加载到 Azure Blob 存储中的 csv 文件中。

Job Partitioner 和 Reader 工作正常。 Writer 写入一个文件,然后关闭,其他作业无法完成,因为流已关闭。我收到以下错误:

Reading: market1
Reading: market2
Reading: market3
Reading: market4
Reading: market5
Writter: /upload-demo/market3_2021-06-01.csv
Writter: /upload-demo/market5_2021-06-01.csv
Writter: /upload-demo/market4_63_2021-06-01.csv
Writter: /upload-demo/market2_2021-06-01.csv
Writter: /upload-demo/market1_11_2021-06-01.csv
2021-06-02 08:24:42.304 ERROR 20356 --- [ taskExecutor-3] c.a.storage.common.StorageOutputStream   : Stream is already closed.
2021-06-02 08:24:42.307  WARN 20356 --- [ taskExecutor-3] o.s.b.f.support.DisposableBeanAdapter    : Destroy method 'close' on bean with name 'scopedTarget.writer2' threw an exception: java.lang.RuntimeException: Stream is already closed.
Reading: market6
Writter: /upload-demo/market6_2021-06-01.csv

这是我的批量配置:

@EnableBatchProcessing
@Configuration
public class BatchConfig extends DefaultBatchConfigurer {

    String connectionString = "azureConnectionString";

    String containerName = "upload-demo";

    String endpoint = "azureHttpsEndpoint";

    String accountName ="azureAccountName";
    String accountKey = "accountKey";

    StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
    BlobServiceClient client = new BlobServiceClientBuilder().connectionString(connectionString).endpoint(endpoint).buildClient();

    @Autowired
    private StepBuilderFactory steps;

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    @Qualifier("verticaDb")
    private DataSource verticaDataSource;

    @Autowired
    private PlatformTransactionManager transactionManager;

    @Autowired
    private ConsoleItemWriter consoleItemWriter;

    @Autowired
    private ItemWriter itemWriter;

    @Bean
    public Job job() throws Exception {
        return jobs.get("job1")
                .start(masterStep(null, null))
                .incrementer(new RunIdIncrementer())
                .build();
    }

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean
    @JobScope
    public Step masterStep(@Value("#{jobParameters['startDate']}") String startDate,
                           @Value("#{jobParameters['endDate']}") String endDate) throws Exception {

        return steps.get("masterStep")
                .partitioner(slaveStep().getName(), new RangePartitioner(verticaDataSource, startDate, endDate))
                .step(slaveStep())
                .gridSize(5)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public Step slaveStep() throws Exception {
        return steps.get("slaveStep")
                .<MarketData, MarketData>chunk(100)
                .reader(pagingItemReader(null, null, null))
                .faultTolerant()
                .skip(NullPointerException.class)
                .skipPolicy(new AlwaysSkipItemSkipPolicy())
                .writer(writer2(null, null, null))  //consoleItemWriter
                .build();
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader pagingItemReader(
            @Value("#{stepExecutionContext['MarketName']}") String marketName,
            @Value("#{jobParameters['startDate']}") String startDate,
            @Value("#{jobParameters['endDate']}") String endDate
            ) throws Exception {

System.out.println("Reading: " + marketName);

        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

        Map<String, Order> sortKey = new HashMap<>();
        sortKey.put("xbin", Order.ASCENDING);
        sortKey.put("ybin", Order.ASCENDING);

        provider.setDataSource(this.verticaDataSource);
        provider.setDatabaseType("POSTGRES");
        provider.setSelectClause("SELECT MARKET AS market, EPSG AS epsg, XBIN AS xbin, YBIN AS ybin, " +
                        "LATITUDE AS latitude, LONGITUDE AS longitude, " +
                        "SUM(TOTALUPLINKVOLUME) AS totalDownlinkVol, SUM(TOTALDOWNLINKVOLUME) AS totalUplinkVol");
        provider.setFromClause("FROM views.geo_analytics");
        provider.setWhereClause(
                "WHERE market='" + marketName + "'" +
                        " AND STARTTIME >= '" + startDate + "'" +
                        " AND STARTTIME < '" + endDate + "'" +
                        " AND TOTALUPLINKVOLUME IS NOT NULL" +
                        " AND TOTALUPLINKVOLUME > 0" +
                        " AND TOTALDOWNLINKVOLUME IS NOT NULL" +
                        " AND TOTALDOWNLINKVOLUME > 0" +
                        " AND EPSG IS NOT NULL" +
                        " AND LATITUDE IS NOT NULL" +
                        " AND LONGITUDE IS NOT NULL" +
                        " AND XBIN IS NOT NULL" +
                        " AND YBIN IS NOT NULL"
        );
        provider.setGroupClause("GROUP BY XBIN, YBIN, MARKET, EPSG, LATITUDE, LONGITUDE");
        provider.setSortKeys(sortKey);

        JdbcPagingItemReader reader = new JdbcPagingItemReader();
        reader.setDataSource(this.verticaDataSource);
        reader.setQueryProvider(provider.getObject());
        reader.setFetchSize(1000);
        reader.setRowMapper(new BeanPropertyRowMapper() {
            {
                setMappedClass((MarketData.class));
            }
        });
        return reader;
    }

    @Bean
    @StepScope
    public FlatFileItemWriter<MarketData> writer2(@Value("#{jobParameters['yearMonth']}") String yearMonth,
                                                 @Value("#{stepExecutionContext['marketName']}") String marketName,
                                                 @Value("#{jobParameters['startDate']}") String startDate) throws URISyntaxException, InvalidKeyException, StorageException, IOException {

        AZBlobWriter<MarketData> writer = new AZBlobWriter<>();

        String fullPath =marketName + "_" + startDate + ".csv";
        String resourceString = "azure-blob://upload-demo/" + fullPath;

        CloudStorageAccount storageAccount = CloudStorageAccount.parse(connectionString);
        CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
        CloudBlobContainer container2 = blobClient.getContainerReference(containerName);
        container2.createIfNotExists();

        AzureStorageResourcePatternResolver storageResourcePatternResolver = new AzureStorageResourcePatternResolver(client);
        Resource resource = storageResourcePatternResolver.getResource(resourceString);


System.out.println("Writter: " + resource.getURI().getPath().toString());

        writer.setResource(resource);
        writer.setStorage(container2);

        writer.setLineAggregator(new DelimitedLineAggregator<MarketData>() {
            {
                setDelimiter(",");
                setFieldExtractor(new BeanWrapperFieldExtractor<MarketData>() {
                    {
                        setNames(new String[] {
                                "market",
                                "epsg",
                                "xbin",
                                "ybin",
                                "latitude",
                                "longitude",
                                "totalDownlinkVol",
                                "totalUplinkVol"
                        });
                    }
                });
            }
        });
        return writer;
    }
}

之前我遇到过其他问题,例如将 FlatFileWriter 的资源设置为 Azure Blob,Spring Batch / Azure Storage account blob resource [container"foo", blob='bar'] cannot be resolved to absolute file path

按照@Mahmoud Ben Hassine 的建议,为 Azure Blob 实现 FlatFileWriter。

我在这篇文章中用作基础 (GCP) 的 FlatFileWriter 的实现:how to configure FlatFileItemWriter to output the file to a ByteArrayRecource?

这是 Azure Blob 的实现:

public class AZBlobWriter<T> extends FlatFileItemWriter<T> {

    private CloudBlobContainer storage;
    private Resource resource;

    private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator");
    private OutputStream os;

    private String lineSeparator = DEFAULT_LINE_SEPARATOR;

    @Override
    public void write(List<? extends T> items) throws Exception {

        StringBuilder lines = new StringBuilder();
        for (T item : items) {
            lines.append(item).append(lineSeparator);
        }
        byte[] bytes = lines.toString().getBytes();
        try {
            os.write(bytes);
        }
        catch (IOException e) {
            throw new WriteFailedException("Could not write data.  The file may be corrupt.", e);
        }
        os.flush();
    }

    @Override
    public void open(ExecutionContext executionContext) {
        try {
            os = ((WritableResource)resource).getOutputStream();
            String bucket = resource.getURI().getHost();
            String filePath = resource.getURI().getPath().substring(1);

            CloudBlockBlob blob = storage.getBlockBlobReference(filePath);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (StorageException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void update(ExecutionContext executionContext) {
    }

    @Override
    public void close() {
        super.close();

        try {
            os.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void setStorage(CloudBlobContainer storage) {
        this.storage = storage;
    }
    @Override
    public void setResource(Resource resource) {
        this.resource = resource;
    }
}

非常感谢任何帮助。对于“脏代码”,我深表歉意,因为我仍在测试/开发它。

谢谢,马库斯。

【问题讨论】:

  • 只是一个(直观的)猜测(我想念你的作者到c.a.storage.common.StorageOutputStream 的链接/堆栈跟踪):什么都不做/不覆盖close()!(?)
  • @xerx593 这会导致资源泄漏:作者在资源上打开了一个输出流,所以它应该关闭它。
  • ...是的,但是看看source codeclose() 是一个“NO-OP”,它的唯一目的是:当它被调用两次时抛出这个异常! ;) ..除此之外:被覆盖的 close 也是 NO-OP! ;)
  • 除了close()ing flush()ing 还“准备”/启用此异常。
  • @xerx593 您刚刚确认了此处AZBlobWriter#close 覆盖close 并关闭它在AZBlobWriter#open 中的资源上打开的输出流的充分理由。我的评论是关于你的建议:Do nothing on/don't override close() !(?) 。如果你什么都不做或者不覆盖close,没有人会关闭那个打开的输出流。所以不管super.close的内容(即no-op与否),AZBlobWrite负责处理自己的资源。

标签: spring spring-batch


【解决方案1】:

您没有共享整个堆栈跟踪以查看此错误何时发生,但似乎不止一次调用了close 方法。我认为这不是由于并发问题,因为我看到您在分区步骤中每个线程使用一个编写器。所以我会通过在关闭它之前检查输出流是否已经关闭来使这个方法“重入”(输出流上没有 isClosed 方法,所以你可以使用自定义布尔值)。

也就是说,我首先确认close 方法被调用了两次,如果是,请调查原因并修复根本原因。

【讨论】:

  • 我在 Reader 和 Writter 中都添加了一个 System.out,并更新了错误堆栈。错误堆栈上没有太多描述。出错后,它会打印出另一个市场,并且不会离开它。
  • 最初我认为作者的多线程导致了这个问题,因为你确认了。我将作业分区器限制为 20 个作业/文件,将阅读器限制为大约 10 行。写入 AZ Blob 是正确的。我只需要解决 isClosed 部分,并确保它不会尝试关闭两次。感谢您的帮助/澄清!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-05-01
  • 2013-10-12
  • 1970-01-01
  • 1970-01-01
  • 2016-12-01
相关资源
最近更新 更多