【问题标题】:Spring Batch process an encoded zipped fileSpring Batch 处理编码的压缩文件
【发布时间】:2015-08-13 09:27:51
【问题描述】:

我正在研究使用 Spring Batch 处理编码压缩文件中的记录。记录是可变长度的,其中包含嵌套的可变长度数据字段。

我是 Spring 和 Spring Batch 的新手,这就是我计划构建批处理配置的方式。

  • ItemReader 需要从压缩 (*.gz) 文件输入流中读取单个记录到 POJO(字节数组)中,该记录的长度将包含在流的前两个字节中。
  • ItemProcessor 将解码字节数组并将信息存储在 POJO 的相关属性中。
  • ItemWriter 将填充数据库。

我最初的问题是了解如何设置 ItemReader,我看过一些使用 FlatFileItemReader 的示例,但我的困难是期望有一个 Line Mapper。在我的情况下,我不知道如何做到这一点(文件中没有行的概念)。

有一些articles 表示使用了自定义的 BufferedReaderFactory,但很高兴看到一个工作示例。

我们将不胜感激。

【问题讨论】:

  • 你能添加解码文件中的记录示例吗?
  • 抱歉延迟回复,我的困惑是基于自定义 ItemReader 中的文件处理,如果我要在 read() 方法中打开和处理文件,我必须跟踪我在文件中的位置等。我设法通过在自定义 ItemReader 的构造函数中创建一个 BufferedInputStream (BufferedInputStream(new GZIPInputStream(new FileInputStream(file))) 来解决这个问题,然后在每次迭代的 read() 方法中处理该流步骤。

标签: spring-batch


【解决方案1】:

如果gzip压缩的文件是简单的txt文件,你只需要一个自定义的BufferedReaderFactory,linemaper然后获取当前行的String

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;
import org.springframework.batch.item.file.BufferedReaderFactory;
import org.springframework.core.io.Resource;

public class GZipBufferedReaderFactory implements BufferedReaderFactory {

    /** Default value for gzip suffixes. */
    private List<String> gzipSuffixes = new ArrayList<String>() {

        {
            add(".gz");
            add(".gzip");
        }
    };

    /**
     * Creates Bufferedreader for gzip Resource, handles normal resources
     * too.
     * 
     * @param resource
     * @param encoding
     * @return
     * @throws UnsupportedEncodingException
     * @throws IOException 
     */
    @Override
    public BufferedReader create(Resource resource, String encoding)
            throws UnsupportedEncodingException, IOException {
        for (String suffix : gzipSuffixes) {
            // test for filename and description, description is used when 
            // handling itemStreamResources
            if (resource.getFilename().endsWith(suffix)
                    || resource.getDescription().endsWith(suffix)) {
                return new BufferedReader(new InputStreamReader(new GZIPInputStream(resource.getInputStream()), encoding));
            }
        }
        return new BufferedReader(new InputStreamReader(resource.getInputStream(), encoding));
    }

    public List<String> getGzipSuffixes() {
        return gzipSuffixes;
    }

    public void setGzipSuffixes(List<String> gzipSuffixes) {
        this.gzipSuffixes = gzipSuffixes;
    }
}

简单的 itemreader 配置:

 <bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
  <property name="resource" value="#{jobParameters['input.file']}" />
  <property name="lineMapper">
    <bean class="org.springframework.batch.item.file.mapping.PassThroughLineMapper" />
  </property>
  <property name="strict" value="true" />
  <property name="bufferedReaderFactory">
    <bean class="your.custom.GZipBufferedReaderFactory" />
  </property>
</bean>

【讨论】:

  • 嗨,Michael,gzip 压缩文件不是简单的 txt 文件。记录是可变长度的,其中包含嵌套的可变长度数据字段。我会更新问题以澄清。
【解决方案2】:

从功能请求票到 Spring Batch (https://jira.spring.io/browse/BATCH-1750):

public class GZIPResource extends InputStreamResource implements Resource {

    public GZIPResource(Resource delegate) throws IOException {
        super(new GZIPInputStream(delegate.getInputStream()));
    }
}

自定义GZipBufferedReaderFactory 不能与FlatFileItemReader 以外的其他对象一起使用。

编辑:懒惰的版本。在调用 getInputStream 之前,它不会尝试打开文件。如果您在程序初始化时创建资源(例如使用自动装配),这可以避免由于文件不存在而导致的异常。

public class GzipLazyResource extends FileSystemResource implements Resource {

    public GzipLazyResource(File file) {
        super(file);
    }

    public GzipLazyResource(String path) {
        super(path);
    }

    @Override
    public InputStream getInputStream() throws IOException {
        return new GZIPInputStream(super.getInputStream());
    }
}

Edit2:这只适用于输入资源

添加另一个类似的方法 getOutputStream 将不起作用,因为 spring 使用 FileSystemResource.getFile,而不是 FileSystemResource.getOutputStream

【讨论】:

    【解决方案3】:

    测试了这种在 S3 中从压缩和编码文件中读取行的简单配置是否有效。

    关键点:

    • 实现一个使用 Apache 的 GZIPInputStreamFactoryBufferedReaderFactory,并将其设置为 FlatFileItemReader 上的 bufferedReaderFactory。
    • 从 Spring Cloud 配置一个 SimpleStorageResourceLoader 和一个 AmazonS3Client,并使用它在 S3 中获取压缩的平面文件。将其设置为 FlatFileItemReader 上的资源。

    注意:读入字符串可以很容易地被读入 POJO 代替。

    GZIPBufferedReaderFactory.java

    使用 Apache 的GZIPInputStreamFactory

    public class GZIPBufferedReaderFactory implements BufferedReaderFactory {
    
        private final GZIPInputStreamFactory gzipInputStreamFactory;
    
        public GZIPBufferedReaderFactory(GZIPInputStreamFactory gzipInputStreamFactory) {
            this.gzipInputStreamFactory = gzipInputStreamFactory;
        }
    
        @Override
        public BufferedReader create(Resource resource, String encoding) throws IOException {
            return new BufferedReader(new InputStreamReader(gzipInputStreamFactory.create(resource.getInputStream()), encoding));
        }
    }
    

    AWSConfiguration.java

    @Configuration
    public class AWSConfiguration {
    
        @Bean
        public AmazonS3Client s3Client(AWSCredentialsProvider credentials, Region region) {
            ClientConfiguration clientConfig = new ClientConfiguration();
    
            AmazonS3Client client = new AmazonS3Client(credentials, clientConfig);
            client.setRegion(region);
            return client;
        }
    }
    

    您如何配置 AWSCredentialsProviderRegion bean 可能会有所不同,我不会在这里详细说明,因为其他地方有文档。

    BatchConfiguration.java

    @Configuration
    @EnableBatchProcessing
    public class SignalsIndexBatchConfiguration {
    
        @Autowired
        public AmazonS3Client s3Client;
    
        @Bean
        public GZIPInputStreamFactory gzipInputStreamFactory() {
            return new GZIPInputStreamFactory();
        }
    
        @Bean
        public GZIPBufferedReaderFactory gzipBufferedReaderFactory(GZIPInputStreamFactory gzipInputStreamFactory) {
            return new GZIPBufferedReaderFactory(gzipInputStreamFactory);
        }
    
        @Bean
        public SimpleStorageResourceLoader simpleStorageResourceLoader() {
            return new SimpleStorageResourceLoader(s3Client);
        }
    
        @Bean
        @StepScope
        protected FlatFileItemReader<String> itemReader(
                SimpleStorageResourceLoader simpleStorageResourceLoader,
                GZIPBufferedReaderFactory gzipBufferedReaderFactory) {
            FlatFileItemReader<String> flatFileItemReader = new FlatFileItemReader<>();
            flatFileItemReader.setBufferedReaderFactory(gzipBufferedReaderFactory);
            flatFileItemReader.setResource(simpleStorageResourceLoader.getResource("s3://YOUR_FLAT_FILE.csv"));
            flatFileItemReader.setLineMapper(new PassThroughLineMapper());
            return flatFileItemReader;
        }
    
        @Bean
        public Job job(Step step) {
            return jobBuilderFactory.get("job").start(step).build();
        }
    
        @Bean
        protected Step step(GZIPInputStreamFactory gzipInputStreamFactory) {
            return stepBuilderFactory.get("step")
                    .<String, String> chunk(200)
                    .reader(itemReader(simpleStorageResourceLoader(), gzipBufferedReaderFactory(gzipInputStreamFactory)))
                    .processor(itemProcessor())
                    .faultTolerant()
                    .build();
        }
    
        /*
         * These components are some of what we
         * get for free with the @EnableBatchProcessing annotation
         */
        @Autowired
        public JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        public StepBuilderFactory stepBuilderFactory;
    
        @Autowired
        public JobRepository jobRepository;
        /*
         * END Freebies
         */
    
        @Bean
        public JobLauncher jobLauncher() throws Exception {
            SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
            jobLauncher.setJobRepository(jobRepository);
            jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
            jobLauncher.afterPropertiesSet();
            return jobLauncher;
        }
    }
    

    【讨论】:

      【解决方案4】:

      我的困惑是基于自定义 ItemReader 中的文件处理,如果我要在 read() 方法中打开和处理文件,我将不得不跟踪我在文件中的位置等。我设法做到了通过在自定义 ItemReader 的构造函数中创建 BufferedInputStream (BufferedInputStream(new GZIPInputStream(new FileInputStream(file))) 来解决这个问题,然后在步骤的每次迭代中在 read() 方法中处理该流。

      【讨论】:

        猜你喜欢
        • 2016-06-29
        • 1970-01-01
        • 2012-05-23
        • 1970-01-01
        • 2012-10-02
        • 2018-05-22
        • 1970-01-01
        • 2020-04-27
        • 2019-02-07
        相关资源
        最近更新 更多