测试了这种在 S3 中从压缩和编码文件中读取行的简单配置是否有效。
关键点:
- 实现一个使用 Apache 的
GZIPInputStreamFactory 的 BufferedReaderFactory,并将其设置为 FlatFileItemReader 上的 bufferedReaderFactory。
- 从 Spring Cloud 配置一个
SimpleStorageResourceLoader 和一个 AmazonS3Client,并使用它在 S3 中获取压缩的平面文件。将其设置为 FlatFileItemReader 上的资源。
注意:读入字符串可以很容易地被读入 POJO 代替。
GZIPBufferedReaderFactory.java
使用 Apache 的GZIPInputStreamFactory
public class GZIPBufferedReaderFactory implements BufferedReaderFactory {
private final GZIPInputStreamFactory gzipInputStreamFactory;
public GZIPBufferedReaderFactory(GZIPInputStreamFactory gzipInputStreamFactory) {
this.gzipInputStreamFactory = gzipInputStreamFactory;
}
@Override
public BufferedReader create(Resource resource, String encoding) throws IOException {
return new BufferedReader(new InputStreamReader(gzipInputStreamFactory.create(resource.getInputStream()), encoding));
}
}
AWSConfiguration.java
@Configuration
public class AWSConfiguration {
@Bean
public AmazonS3Client s3Client(AWSCredentialsProvider credentials, Region region) {
ClientConfiguration clientConfig = new ClientConfiguration();
AmazonS3Client client = new AmazonS3Client(credentials, clientConfig);
client.setRegion(region);
return client;
}
}
您如何配置 AWSCredentialsProvider 和 Region bean 可能会有所不同,我不会在这里详细说明,因为其他地方有文档。
BatchConfiguration.java
@Configuration
@EnableBatchProcessing
public class SignalsIndexBatchConfiguration {
@Autowired
public AmazonS3Client s3Client;
@Bean
public GZIPInputStreamFactory gzipInputStreamFactory() {
return new GZIPInputStreamFactory();
}
@Bean
public GZIPBufferedReaderFactory gzipBufferedReaderFactory(GZIPInputStreamFactory gzipInputStreamFactory) {
return new GZIPBufferedReaderFactory(gzipInputStreamFactory);
}
@Bean
public SimpleStorageResourceLoader simpleStorageResourceLoader() {
return new SimpleStorageResourceLoader(s3Client);
}
@Bean
@StepScope
protected FlatFileItemReader<String> itemReader(
SimpleStorageResourceLoader simpleStorageResourceLoader,
GZIPBufferedReaderFactory gzipBufferedReaderFactory) {
FlatFileItemReader<String> flatFileItemReader = new FlatFileItemReader<>();
flatFileItemReader.setBufferedReaderFactory(gzipBufferedReaderFactory);
flatFileItemReader.setResource(simpleStorageResourceLoader.getResource("s3://YOUR_FLAT_FILE.csv"));
flatFileItemReader.setLineMapper(new PassThroughLineMapper());
return flatFileItemReader;
}
@Bean
public Job job(Step step) {
return jobBuilderFactory.get("job").start(step).build();
}
@Bean
protected Step step(GZIPInputStreamFactory gzipInputStreamFactory) {
return stepBuilderFactory.get("step")
.<String, String> chunk(200)
.reader(itemReader(simpleStorageResourceLoader(), gzipBufferedReaderFactory(gzipInputStreamFactory)))
.processor(itemProcessor())
.faultTolerant()
.build();
}
/*
* These components are some of what we
* get for free with the @EnableBatchProcessing annotation
*/
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public JobRepository jobRepository;
/*
* END Freebies
*/
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}