【问题标题】:Spring Batch: One reader, multiple processors and writersSpring Batch:一个阅读器,多个处理器和编写器
【发布时间】:2013-09-30 17:43:20
【问题描述】:

在 Spring 批处理中,我需要将 ItemReader 读取的项目传递给两个不同的处理器和写入器。我想要实现的是......

+---> ItemProcessor#1 ---> ItemWriter#1 | ItemReader ---> 项目 ---+ | +---> ItemProcessor#2 ---> ItemWriter#2

这是必需的,因为与 ItemWriter#2 编写的项目相比,ItemWriter#1 编写的项目应该以完全不同的方式处理。 此外,ItemReader 从数据库中读取项目,它执行的查询计算量非常大,执行两次相同的查询应该被丢弃。

关于如何实现这种设置的任何提示?或者,至少,一个逻辑上等价的设置?

【问题讨论】:

  • 嗨@danidemi,我需要使用同一个编写器写入两个不同的表。我正在使用基于 java 的配置。如何实现这一点?任何帮助
  • 嗨@sashikanta 这里真的没有足够的空间来回答你的问题。您为什么不写一个全新的问题以获得社区的帮助?

标签: spring spring-batch itemprocessor itemwriter


【解决方案1】:

如果您的商品应由处理器 #1 和处理器 #2 处理,则此解决方案有效

您必须使用此签名创建处理器#0:

class Processor0<Item, CompositeResultBean>

其中CompositeResultBean 是一个定义为的bean

class CompositeResultBean {
  Processor1ResultBean result1;
  Processor2ResultBean result2;
}

在您的处理器#0 中,只需将工作委派给处理器#1 和#2,并将结果放入CompositeResultBean

CompositeResultBean Processor0.process(Item item) {
  final CompositeResultBean r = new CompositeResultBean();
  r.setResult1(processor1.process(item));
  r.setResult2(processor2.process(item));
  return r;
}

你自己的作家是CompositeItemWriter,代表作家CompositeResultBean.result1CompositeResultBean.result2(看看PropertyExtractingDelegatingItemWriter,也许能帮上忙)

【讨论】:

  • 遗憾的是,要求确实是同一项目必须由两个处理器处理。只是为了给你更多的上下文,读者从数据库中读取一条记录,然后处理器#1 应该将字段名称映射到其他字段名称,而编写器#1 会将映射的项目写入另一个数据库。处理器#2,从相同的原始项目开始,应该执行完全不同的细化,然后将项目写入遗留系统。
  • 好的,我也有解决这个问题的办法。等待编辑
  • @bellabax - 我很想看到你的解决方案。你有时间在这里更新吗?
【解决方案2】:

我按照 Luca 的建议使用 PropertyExtractingDelegatingItemWriter 作为作者,并且我能够在一个步骤中处理两个不同的实体。

首先我所做的是定义一个 DTO 来存储来自处理器的两个实体/结果

public class DatabaseEntry {
    private AccessLogEntry accessLogEntry;
    private BlockedIp blockedIp;

    public AccessLogEntry getAccessLogEntry() {
        return accessLogEntry;
    }

    public void setAccessLogEntry(AccessLogEntry accessLogEntry) {
        this.accessLogEntry = accessLogEntry;
    }

    public BlockedIp getBlockedIp() {
        return blockedIp;
    }

    public void setBlockedIp(BlockedIp blockedIp) {
        this.blockedIp = blockedIp;
    }
}

然后我将此 DTO 传递给编写器,这是一个 PropertyExtractingDelegatingItemWriter 类,我在其中定义了两个自定义方法将实体写入数据库,请参见下面的编写器代码:

@Configuration
public class LogWriter extends LogAbstract {
    @Autowired
    private DataSource dataSource;

    @Bean()
    public PropertyExtractingDelegatingItemWriter<DatabaseEntry> itemWriterAccessLogEntry() {
        PropertyExtractingDelegatingItemWriter<DatabaseEntry> propertyExtractingDelegatingItemWriter = new PropertyExtractingDelegatingItemWriter<DatabaseEntry>();
        propertyExtractingDelegatingItemWriter.setFieldsUsedAsTargetMethodArguments(new String[]{"accessLogEntry", "blockedIp"});
        propertyExtractingDelegatingItemWriter.setTargetObject(this);
        propertyExtractingDelegatingItemWriter.setTargetMethod("saveTransaction");
        return propertyExtractingDelegatingItemWriter;
    }

    public void saveTransaction(AccessLogEntry accessLogEntry, BlockedIp blockedIp) throws SQLException {
        writeAccessLogTable(accessLogEntry);
        if (blockedIp != null) {
            writeBlockedIp(blockedIp);
        }

    }

    private void writeBlockedIp(BlockedIp entry) throws SQLException {
        PreparedStatement statement = dataSource.getConnection().prepareStatement("INSERT INTO blocked_ips (ip,threshold,startDate,endDate,comment) VALUES (?,?,?,?,?)");
        statement.setString(1, entry.getIp());
        statement.setInt(2, threshold);
        statement.setTimestamp(3, Timestamp.valueOf(startDate));
        statement.setTimestamp(4, Timestamp.valueOf(endDate));
        statement.setString(5, entry.getComment());
        statement.execute();
    }

    private void writeAccessLogTable(AccessLogEntry entry) throws SQLException {
        PreparedStatement statement = dataSource.getConnection().prepareStatement("INSERT INTO log_entries (date,ip,request,status,userAgent) VALUES (?,?,?,?,?)");
        statement.setTimestamp(1, Timestamp.valueOf(entry.getDate()));
        statement.setString(2, entry.getIp());
        statement.setString(3, entry.getRequest());
        statement.setString(4, entry.getStatus());
        statement.setString(5, entry.getUserAgent());
        statement.execute();
    }
}

使用这种方法,您可以从单个读取器获取所需的初始行为以处理多个实体并在一个步骤中保存它们。

【讨论】:

    【解决方案3】:

    您可以使用CompositeItemProcessorCompositeItemWriter

    它看起来与您的架构不完全相同,它是连续的,但它会完成这项工作。

    【讨论】:

    • 我觉得还不够。我将以一个“复杂”处理器结束,但最终只有一个处理过的项目传递给一个 ItemWriter!相反,我需要 ItemWriter#1 来编写一个不是由 ItemWriter#2 编写的项目!一个要求是两个 ItemWriters 作用于不同的项目!请记住,在 spring-batch 中,您有一个三元步骤:一个 ItemReader、一个 ItemProcessor、一个 ItemWriter。
    【解决方案4】:

    这是我想出的解决方案。

    因此,我们的想法是编写一个“包含”ItemProcessor 和 ItemWriter 的新 Writer。只是为了给你一个想法,我们称之为 PreprocessoWriter,这就是核心代码。

    private ItemWriter<O> writer;
    private ItemProcessor<I, O> processor;
    
    @Override
    public void write(List<? extends I> items) throws Exception {
        List<O> toWrite = new ArrayList<O>();
        for (I item : items) {
            toWrite.add(processor.process(item));
        }
        writer.write(toWrite);
    }
    

    有很多东西被搁置一旁。例如,ItemStream 的管理。但在我们的特定场景中,这就足够了。

    所以你可以将多个 PreprocessorWriter 与 CompositeWriter 结合起来。

    【讨论】:

    • 将处理器和编写器混为一谈是个坏主意:此组件已分别创建,以使进程和编写两个独立的关注点
    • 我同意你的一般概念。但是,正如您在原始问题中肯定读到的那样,问题是读取项目一次,并以两种完全不同的方式处理/写入它们。那么,您能否分享一下您将如何解决问题所涉及的特定问题?
    • 检查我的答案。处理是使用复合处理器完成的,该处理器将单个项目作为输入,而自定义 bean (CompositeResultBean) 则保存多个处理的结果。使用委托编写:CompositeResultBean 是输入,并为 CompositeResultBean.result1 和 CompositeResultBean.result2 调用正确的委托编写器。读取一次,使用组合和委托使用分离的 ItemProcessor/ItemWriter 处理/写入。一切都没有混淆概念
    【解决方案5】:

    如果您有合理数量的项目(例如少于 1 个 Go),还有另一种解决方案:您可以将选择的结果缓存到包装在 Spring bean 中的集合中。

    然后你可以免费阅读该集合两次。

    【讨论】:

    • 是的,确实可以做到。但是如果第一个写者在写一个项目时失败了怎么办?如果我的想法正确,我认为第二位作家将没有机会写任何项目。对吗?
    • 嗯,不,作者没有互相传递数据,他们从一个 spring 单例 bean 内部读取内存集合中的相同数据。
    猜你喜欢
    • 1970-01-01
    • 2019-06-04
    • 1970-01-01
    • 2017-01-15
    • 2018-02-06
    • 1970-01-01
    • 1970-01-01
    • 2021-06-23
    • 1970-01-01
    相关资源
    最近更新 更多