【问题标题】:Handling multiple StoredProcedureItemReader calls处理多个 StoredProcedureItemReader 调用
【发布时间】:2015-06-13 14:58:08
【问题描述】:

我必须在 Spring Batch 作业中实现以下用例:

  1. 通过StoredProcedureItemReader读取提供者列表
  2. 遍历列表并为步骤 1 中找到的每个提供程序(作为输入参数)调用另一个 StoredProcedureItemReader
  3. 第二个 SP 的输出将写入 CSV。

我想出了以下策略:

  1. 第 1 步开始
  2. SP ItemReader 返回提供者列表。
  3. 在 ItemWriter 中,将提供程序保存到 ExecutionContext
  4. 第 1 步结束
  5. 第 2 步开始
  6. 另一个 SP ItemReader 从ExecutionContext 访问提供程序
  7. 另一个 ItemWriter 使用 FlatFileItemWriter 将响应写入 CSV

我无法理解第二个 SP ItemReader 将如何访问第一个 SP ItemReader 返回的提供程序列表。分区在这里有帮助吗?另外,有没有更好的策略来实现这一点?

编辑 1:

这是 ItemProcessor 的原始实现(作为充实):

@Scope("step")
public class FetchReportFromProviderProcessor implements
        ItemProcessor<RiscProvider, List<SogReportRecord>>, ItemStream {

    StoredProcedureItemReader<SogReportRecord> reader = new StoredProcedureItemReader<SogReportRecord>();
    private DataSource dataSource;

    @Value("#{jobParameters['date']}")
    private String date;

    @Override
    public List<SogReportRecord> process(final RiscProvider item) throws Exception {

        SogReportRecord record = null;
        List<SogReportRecord> records = new ArrayList<SogReportRecord>();

        SqlParameter[] sqlParameters = new SqlParameter[] {new SqlParameter(OracleTypes.CURSOR)};           

        reader.setParameters(sqlParameters);
        reader.setPreparedStatementSetter(new PreparedStatementSetter() {

            @Override
            public void setValues(PreparedStatement ps) throws SQLException {
                ps.setString(0, item.getPrefix());
                ps.setString(1, date);
            }
        });

        while( (record = reader.read()) != null ) {
            records.add(record);
        }

        return records;
    }

    public DataSource getDataSource() {
        return dataSource;
    }

    public void setDataSource(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Override
    public void open(ExecutionContext executionContext)
            throws ItemStreamException {
        reader.setDataSource(dataSource);
        reader.setProcedureName("RISC_GET_DAYMOVEINOUT");
        reader.open(executionContext);      
    }

    @Override
    public void update(ExecutionContext executionContext)
            throws ItemStreamException {
        reader.update(executionContext);        
    }

    @Override
    public void close() throws ItemStreamException {
        reader.close();     
    }

}

还有 XML 部分:

<batch:job id="SOG_MOVEINOUT_REPORT_GENERATOR">
    <batch:step id="GET_REPORTS">
        <batch:tasklet>
            <batch:chunk reader="getProviders"
                processor="fetchRecordsFromProvider"
                writer="sogReportWriter" commit-interval="500" />
        </batch:tasklet>
    </batch:step>
</batch:job>

<!-- Reader to fetch list of providers -->
<bean id="getProviders" class="org.springframework.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource" />
    <property name="procedureName" value="RISC_GET_PROVIDER" />
    <property name="parameters">
        <list>
            <bean class="org.springframework.jdbc.core.SqlOutParameter">
                <constructor-arg index="0" value="providers" />
                <constructor-arg index="1">
                    <util:constant static-field="oracle.jdbc.OracleTypes.CURSOR" />
                </constructor-arg>
            </bean>
        </list>
    </property>
    <property name="refCursorPosition" value="1" />
    <property name="rowMapper">
        <bean class="com.kpn.risc.ProviderRowMapper" />
    </property>
</bean>

<bean id="fetchRecordsFromProvider" class="com.kpn.risc.FetchReportFromProviderProcessor">
    <property name="dataSource" ref="dataSource" />
</bean>

<bean id="sogReportWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
    <property name="resource" value="file:///${batch.job.report.dir}/report-#{stepExecutionContext['provider']}.csv" />
    <property name="lineAggregator">
        <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
            <property name="fieldExtractor">
                <bean class="org.springframework.batch.item.file.transform.PassThroughFieldExtractor" />
            </property>
        </bean>
    </property>
</bean>

在上面的代码中,处理器将其工作委托给 SP ItemReader。但是在调用read() 方法之前,读者无法正确初始化。是否可以或建议在ItemProcessor 内调用ItemReader

【问题讨论】:

  • 嘿,你有没有为这种用例找到任何好的解决方案?

标签: java spring-batch


【解决方案1】:

您所描述的内容直接属于批处理的驱动查询模式。从本质上讲,一个查询定义了另一个查询用来驱动它的查询的 ids(或在您的情况下为提供程序)。通常,ItemReader 读取 id 并将每个 ID 传递给 ItemProcessor 以进行丰富(“其他”查询)。然后将结果传递给ItemWriter

您可以在此处的 Spring Batch 文档的 Common Batch Patterns 部分中阅读有关驱动查询模式的更多信息:http://docs.spring.io/spring-batch/trunk/reference/html/patterns.html

【讨论】:

  • 感谢迈克尔的回复。我的第一个想法是选择ItemProcessor 作为补充。我尝试将处理器委托给StoredProcedureItemReader。 (添加有问题的代码)。但是我的读者需要传递给处理器的提供程序。在调用start() 方法之前如何将其传递给读者?
  • 我假设您的意思是您委托的ItemReader 上的open(ExecutionContext context) 方法。由于您的阅读器未在 Spring Batch 中注册为阅读器,因此我们不会自动为您调用 ItemStream 生命周期方法。要注册您的委托阅读器(这将导致调用这些方法),您需要手动将阅读器注册为流。有关如何执行此操作的文档可以在此处的第 5.1.9 节中找到:docs.spring.io/spring-batch/reference/html/configureStep.html
  • 好的,我已将委托阅读器配置为流。但是委托读者需要提供者作为存储过程的输入参数来初始化自身。我在process 处理器方法中有提供程序。如何将其传递给委派的读者?
  • 对不起。我带你走错了路。如果需要传入参数,则需要编写自己的 DAO 而不是使用阅读器,因为阅读器在调用 ItemStream#open 时执行实际语句(打开光标)。使用我们的阅读器的唯一方法是为每个新提供者创建一个新实例。
猜你喜欢
  • 2018-10-22
  • 1970-01-01
  • 1970-01-01
  • 2022-08-02
  • 2016-07-18
  • 2013-03-18
  • 1970-01-01
  • 1970-01-01
  • 2018-12-27
相关资源
最近更新 更多