【发布时间】:2021-06-02 20:35:58
【问题描述】:
基本上我有一个 Spring Batch,它查询数据库并实现 Partitioner 以获取作业,并将作业分配给 SlaveStep 中的 ThreadPoolTaskExecutors。
Reader 从数据库中读取(作业)。 Writer 将数据加载到 Azure Blob 存储中的 csv 文件中。
Job Partitioner 和 Reader 工作正常。 Writer 写入一个文件,然后关闭,其他作业无法完成,因为流已关闭。我收到以下错误:
Reading: market1
Reading: market2
Reading: market3
Reading: market4
Reading: market5
Writter: /upload-demo/market3_2021-06-01.csv
Writter: /upload-demo/market5_2021-06-01.csv
Writter: /upload-demo/market4_63_2021-06-01.csv
Writter: /upload-demo/market2_2021-06-01.csv
Writter: /upload-demo/market1_11_2021-06-01.csv
2021-06-02 08:24:42.304 ERROR 20356 --- [ taskExecutor-3] c.a.storage.common.StorageOutputStream : Stream is already closed.
2021-06-02 08:24:42.307 WARN 20356 --- [ taskExecutor-3] o.s.b.f.support.DisposableBeanAdapter : Destroy method 'close' on bean with name 'scopedTarget.writer2' threw an exception: java.lang.RuntimeException: Stream is already closed.
Reading: market6
Writter: /upload-demo/market6_2021-06-01.csv
这是我的批量配置:
@EnableBatchProcessing
@Configuration
public class BatchConfig extends DefaultBatchConfigurer {
String connectionString = "azureConnectionString";
String containerName = "upload-demo";
String endpoint = "azureHttpsEndpoint";
String accountName ="azureAccountName";
String accountKey = "accountKey";
StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
BlobServiceClient client = new BlobServiceClientBuilder().connectionString(connectionString).endpoint(endpoint).buildClient();
@Autowired
private StepBuilderFactory steps;
@Autowired
private JobBuilderFactory jobs;
@Autowired
@Qualifier("verticaDb")
private DataSource verticaDataSource;
@Autowired
private PlatformTransactionManager transactionManager;
@Autowired
private ConsoleItemWriter consoleItemWriter;
@Autowired
private ItemWriter itemWriter;
@Bean
public Job job() throws Exception {
return jobs.get("job1")
.start(masterStep(null, null))
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(10);
taskExecutor.initialize();
return taskExecutor;
}
@Bean
@JobScope
public Step masterStep(@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['endDate']}") String endDate) throws Exception {
return steps.get("masterStep")
.partitioner(slaveStep().getName(), new RangePartitioner(verticaDataSource, startDate, endDate))
.step(slaveStep())
.gridSize(5)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Step slaveStep() throws Exception {
return steps.get("slaveStep")
.<MarketData, MarketData>chunk(100)
.reader(pagingItemReader(null, null, null))
.faultTolerant()
.skip(NullPointerException.class)
.skipPolicy(new AlwaysSkipItemSkipPolicy())
.writer(writer2(null, null, null)) //consoleItemWriter
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader pagingItemReader(
@Value("#{stepExecutionContext['MarketName']}") String marketName,
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['endDate']}") String endDate
) throws Exception {
System.out.println("Reading: " + marketName);
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
Map<String, Order> sortKey = new HashMap<>();
sortKey.put("xbin", Order.ASCENDING);
sortKey.put("ybin", Order.ASCENDING);
provider.setDataSource(this.verticaDataSource);
provider.setDatabaseType("POSTGRES");
provider.setSelectClause("SELECT MARKET AS market, EPSG AS epsg, XBIN AS xbin, YBIN AS ybin, " +
"LATITUDE AS latitude, LONGITUDE AS longitude, " +
"SUM(TOTALUPLINKVOLUME) AS totalDownlinkVol, SUM(TOTALDOWNLINKVOLUME) AS totalUplinkVol");
provider.setFromClause("FROM views.geo_analytics");
provider.setWhereClause(
"WHERE market='" + marketName + "'" +
" AND STARTTIME >= '" + startDate + "'" +
" AND STARTTIME < '" + endDate + "'" +
" AND TOTALUPLINKVOLUME IS NOT NULL" +
" AND TOTALUPLINKVOLUME > 0" +
" AND TOTALDOWNLINKVOLUME IS NOT NULL" +
" AND TOTALDOWNLINKVOLUME > 0" +
" AND EPSG IS NOT NULL" +
" AND LATITUDE IS NOT NULL" +
" AND LONGITUDE IS NOT NULL" +
" AND XBIN IS NOT NULL" +
" AND YBIN IS NOT NULL"
);
provider.setGroupClause("GROUP BY XBIN, YBIN, MARKET, EPSG, LATITUDE, LONGITUDE");
provider.setSortKeys(sortKey);
JdbcPagingItemReader reader = new JdbcPagingItemReader();
reader.setDataSource(this.verticaDataSource);
reader.setQueryProvider(provider.getObject());
reader.setFetchSize(1000);
reader.setRowMapper(new BeanPropertyRowMapper() {
{
setMappedClass((MarketData.class));
}
});
return reader;
}
@Bean
@StepScope
public FlatFileItemWriter<MarketData> writer2(@Value("#{jobParameters['yearMonth']}") String yearMonth,
@Value("#{stepExecutionContext['marketName']}") String marketName,
@Value("#{jobParameters['startDate']}") String startDate) throws URISyntaxException, InvalidKeyException, StorageException, IOException {
AZBlobWriter<MarketData> writer = new AZBlobWriter<>();
String fullPath =marketName + "_" + startDate + ".csv";
String resourceString = "azure-blob://upload-demo/" + fullPath;
CloudStorageAccount storageAccount = CloudStorageAccount.parse(connectionString);
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
CloudBlobContainer container2 = blobClient.getContainerReference(containerName);
container2.createIfNotExists();
AzureStorageResourcePatternResolver storageResourcePatternResolver = new AzureStorageResourcePatternResolver(client);
Resource resource = storageResourcePatternResolver.getResource(resourceString);
System.out.println("Writter: " + resource.getURI().getPath().toString());
writer.setResource(resource);
writer.setStorage(container2);
writer.setLineAggregator(new DelimitedLineAggregator<MarketData>() {
{
setDelimiter(",");
setFieldExtractor(new BeanWrapperFieldExtractor<MarketData>() {
{
setNames(new String[] {
"market",
"epsg",
"xbin",
"ybin",
"latitude",
"longitude",
"totalDownlinkVol",
"totalUplinkVol"
});
}
});
}
});
return writer;
}
}
之前我遇到过其他问题,例如将 FlatFileWriter 的资源设置为 Azure Blob,Spring Batch / Azure Storage account blob resource [container"foo", blob='bar'] cannot be resolved to absolute file path。
按照@Mahmoud Ben Hassine 的建议,为 Azure Blob 实现 FlatFileWriter。
我在这篇文章中用作基础 (GCP) 的 FlatFileWriter 的实现:how to configure FlatFileItemWriter to output the file to a ByteArrayRecource?
这是 Azure Blob 的实现:
public class AZBlobWriter<T> extends FlatFileItemWriter<T> {
private CloudBlobContainer storage;
private Resource resource;
private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator");
private OutputStream os;
private String lineSeparator = DEFAULT_LINE_SEPARATOR;
@Override
public void write(List<? extends T> items) throws Exception {
StringBuilder lines = new StringBuilder();
for (T item : items) {
lines.append(item).append(lineSeparator);
}
byte[] bytes = lines.toString().getBytes();
try {
os.write(bytes);
}
catch (IOException e) {
throw new WriteFailedException("Could not write data. The file may be corrupt.", e);
}
os.flush();
}
@Override
public void open(ExecutionContext executionContext) {
try {
os = ((WritableResource)resource).getOutputStream();
String bucket = resource.getURI().getHost();
String filePath = resource.getURI().getPath().substring(1);
CloudBlockBlob blob = storage.getBlockBlobReference(filePath);
} catch (IOException e) {
e.printStackTrace();
} catch (StorageException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
@Override
public void update(ExecutionContext executionContext) {
}
@Override
public void close() {
super.close();
try {
os.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public void setStorage(CloudBlobContainer storage) {
this.storage = storage;
}
@Override
public void setResource(Resource resource) {
this.resource = resource;
}
}
非常感谢任何帮助。对于“脏代码”,我深表歉意,因为我仍在测试/开发它。
谢谢,马库斯。
【问题讨论】:
-
只是一个(直观的)猜测(我想念你的作者到
c.a.storage.common.StorageOutputStream的链接/堆栈跟踪):什么都不做/不覆盖close()!(?) -
@xerx593 这会导致资源泄漏:作者在资源上打开了一个输出流,所以它应该关闭它。
-
...是的,但是看看source code,
close()是一个“NO-OP”,它的唯一目的是:当它被调用两次时抛出这个异常! ;) ..除此之外:被覆盖的 close 也是 NO-OP! ;) -
除了
close()ingflush()ing 还“准备”/启用此异常。 -
@xerx593 您刚刚确认了此处
AZBlobWriter#close覆盖close并关闭它在AZBlobWriter#open中的资源上打开的输出流的充分理由。我的评论是关于你的建议:Do nothing on/don't override close() !(?)。如果你什么都不做或者不覆盖close,没有人会关闭那个打开的输出流。所以不管super.close的内容(即no-op与否),AZBlobWrite负责处理自己的资源。
标签: spring spring-batch