【问题标题】:Processing Huge CSV File using Producer - Consumer Pattern使用生产者 - 消费者模式处理巨大的 CSV 文件
【发布时间】:2019-08-14 05:55:42
【问题描述】:

我正在尝试处理范围从 10 条记录到百万条记录的任意 CSV 文件。 CSV 文件有 4 个固定列(例如 a、b、c、d)和 2 个附加列(e、f),它们来自外部 REST API。

我的目标是从 CSV 读取所有记录,并为每条记录调用外部 REST API 以带来 2 个额外的列并将生成的 CSV 输出为合并的 CSV。输出应该是具有列 (a,b,c,d,e,f) 的相同 csv 文件。

我使用 Spring Integration 的 EIP 中的 Content Enricher 模式 实现了这个场景,并且能够实现预期的输出,但是当我按顺序读取 CSV 文件时,这个解决方案适用于少量记录,但只要没有。记录数增加,执行程序的时间也以 O(n) 方式增加。

我进一步开始实现生产者 - 消费者设计模式并尝试以这样一种方式实现代码:从 CSV 读取的每条记录然后使用 put() 放入队列中,然后多个消费者读取使用 BlockingQueue 的 take() 方法从同一个共享队列中获取。主程序使用 Executors.newFixedSizedThreadPool(3) 用 1 个生产者和多个消费者实例化 ExecutorService,但是我面临几个挑战:

  1. take() 方法永远不会退出。我尝试通过添加一个终结器对象来实现 Poison Pill,然后检查 Consumer 循环中是否有相同的毒丸爆发,但它仍然从未爆发(我在循环中添加了一个系统,看看它是否曾经到达 Poison Pill 和它确实将我的语句打印出来),那么为什么它不退出呢?

  2. CSV 文件仅保留从上次执行的消费者线程读取的数据,并覆盖从其他消费者写入的所有内容 - 我正在使用 OpenCSV 读取/写入 CSV 数据。

这是我到现在为止的代码。有人可以指导我在此代码中的错误和需要改进的地方吗?

主程序

**

BlockingQueue<Account> queue = new ArrayBlockingQueue<>(100);
    AccountProducer readingThread = new AccountProducer(inputFileName, queue);
    //new Thread(readingThread).start();
    ExecutorService producerExecutor = Executors.newFixedThreadPool(1);
    producerExecutor.submit(readingThread);

    AccountConsumer normalizers = new AccountConsumer(outputFileName, queue, accountService );
    ExecutorService consumerExecutor = Executors.newFixedThreadPool(3);

    for (int i = 1; i <= 3; i++) {
        consumerExecutor.submit(normalizers);
    }
    producerExecutor.shutdown();
    consumerExecutor.shutdown();

AccountProducer

public class AccountProducer implements Runnable {
private String inputFileName;
private BlockingQueue<Account> blockingQueue;
private static final String TERMINATOR = "TERMINATOR";

public AccountProducer (String inputFileName, BlockingQueue<Account> blockingQueue) {

    this.inputFileName = inputFileName;
    this.blockingQueue = blockingQueue;
}


@Override
public void run() {

    try (Reader reader = Files.newBufferedReader(Paths.get(inputFileName));) {

        PropertyEditorManager.registerEditor(java.util.Date.class, DateEditor.class);
        ColumnPositionMappingStrategy<Account> strategy = new ColumnPositionMappingStrategy<>();
        strategy.setType(Account.class);
        String[] memberFieldsToBindTo = { "accountId", "accountName", "firstName", "createdOn" };
        strategy.setColumnMapping(memberFieldsToBindTo);

        CsvToBean<Account> csvToBean = new CsvToBeanBuilder<Account>(reader).withMappingStrategy(strategy)
                .withSkipLines(1).withIgnoreLeadingWhiteSpace(true).build();

        Iterator<Account> csvAccountIterator = csvToBean.iterator();

        while (csvAccountIterator.hasNext()) {
            Account account = csvAccountIterator.next();    
            // Checking if the Account Id in the csv is blank / null - If so, we skip the
            // row for processing and hence avoiding API call..
            if (null == account.getAccountId() || account.getAccountId().isEmpty()) {
                continue;
            } else {
                // This call will send the extracted Account Object down the Enricher to get
                // additional details from API
                blockingQueue.put(account);
            }
        }
    } catch (InterruptedException | IOException ex) {
        System.out.println(ex);
    } finally {
        while (true) {
            try {
                Account account = new Account();
                account.setAccountId(TERMINATOR);
                blockingQueue.put(account);
                break;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
}

帐户消费者

public class AccountConsumer implements Runnable {

private String outputFileLocation;
private BlockingQueue<Account> blockingQueue;
private AccountService accountService;

public AccountConsumer(String outputFileLocation, BlockingQueue<Account> blockingQueue, AccountService accountService) {
    this.blockingQueue = blockingQueue;
    this.outputFileLocation = outputFileLocation;
    this.accountService = accountService;
}

@Override
public void run() {
    List<Account> accounts = new ArrayList<>();

    try {
        while (true) {
            Account account = blockingQueue.poll();
            account = accountService.populateAccount(account);
            accounts.add(account);
        }

    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    } catch (Exception ex) {
        System.out.println(ex);
    }
    processOutput(accounts, outputFileLocation);
}

/**
 * The method processOutput simply takes the list of Accounts and writes them to
 * CSV.
 * 
 * @param outputFileName
 * @param accounts
 * @throws Exception
 */
private void processOutput(List<Account> accounts, String outputFileName) {

    System.out.println("List Size is : " + accounts.size());
    // Using try with Resources block to make sure resources are released properly
    try (Writer writer = new FileWriter(outputFileName, true);) {
        StatefulBeanToCsv<Account> beanToCsv = new StatefulBeanToCsvBuilder(writer).build();
        beanToCsv.write(accounts);
    } catch (CsvDataTypeMismatchException | CsvRequiredFieldEmptyException ex) {
        System.out.println(ex);
        //logger.error("Unable to write the output CSV File : " + ex);
        //throw ex;
    } catch (IOException e) {
        e.printStackTrace();
    }
}

}

这是我正在使用的 Spring Integration XML:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context 
http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/task 
http://www.springframework.org/schema/task/spring-task.xsd
    http://www.springframework.org/schema/integration 
http://www.springframework.org/schema/integration/spring-integration.xsd"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:beans="http://www.springframework.org/schema/beans" 
xmlns:task="http://www.springframework.org/schema/task">

<channel id="accountChannel" /> 
<!-- accountOutputChannel is used for putting the Account object on the 
Channel which will then be consumed by accountAPIChannel as Input to the API 
-->
<channel id="accountOutputChannel" />
<!-- accountAPIChannel will take 1 accountId at a time and invoke REST API 
Service to get additional details required to fill the Content Enricher -->
<channel id="accountAPIChannel" />

<!-- accountGateway is the entry point to the utility -->
<gateway id="accountGateway" default-request-timeout="5000"
    default-reply-timeout="5000"
    service-interface="com.epe.service.AccountService"
    default-request-channel="accountChannel">
</gateway>

<!--Content  enricher is used here for enriching an existing message with 
additional data from External API
     This is based on EIP Pattern - Content Enricher -->
<enricher id="enricher" input-channel="accountOutputChannel"
    request-channel="accountAPIChannel">
    <property name="status" expression="payload.status" />
    <property name="statusSetOn" expression="payload.createdOn" />
</enricher>

<beans:bean id="accountService"
    class="com.epe.service.impl.AccountServiceImpl" />

<!-- Below service-activator is used to actually invoke the external REST 
API which will provide the additional fields for enrichment -->
<service-activator id="fetchAdditionalAccountInfoServiceActivator"
    ref="accountInfoService" method="getAdditionalAccountInfoService" 
input-channel="accountAPIChannel"
    />

<!-- accountInfoService is a bean which will be used for fetching 
additional information from REST API Service -->
<beans:bean id="accountInfoService"
    class="com.epe.service.impl.AccountInfoServiceImpl" />

</beans:beans>

【问题讨论】:

  • 我认为您需要重新设计在生产者和消费者中处理 InterruptException 的方式
  • 你能给我看一个例子 w.r.t 我上面的代码吗?

标签: java spring spring-integration producer-consumer opencsv


【解决方案1】:

您在代码中使用的是poll(),而不是take()

您应该使用 poll() 来代替超时,例如poll(10, TimeUnit.SECONDS) 这样您就可以优雅地终止每个线程。

但是,您不需要所有这些逻辑;您可以使用 Spring 集成组件来实现所有这些 - ExecutorChannel 和附加模式下的文件出站通道适配器。

编辑

我没有时间编写您的整个应用程序,但基本上您需要...

<file:inbound-channel-adapter />
<file:splitter output-channel="execChannel"/>
<int:channel id="execChannel">
    <int:dispatcher task-executor="exec" />
</int:channel>
<int:transformer /> <!-- OpenCSV -->
<int:enricher ... />
<int:transformer /> <!-- OpenCSV -->
<int:resequencer /> <!== restore order -->
<file:outbound-channel-adapter />

您可以在参考手册中阅读所有这些组件。

您可能还想考虑使用 Java DSL 而不是 xml;类似...

@Bean
public IntegrationFlow flow() {
    return IntegrationFlows.from(File.inboundChannelAdapter(...))
              .split(Files.splitter())
              .channel(MessageChannels.executor(exec())
              .transform(...)
              .enrich(...)
              .transform(...)
              .resequence()
              .handle(File.outboundCHannelAdapter(...))
              .get();

【讨论】:

  • 谢谢 Gary - 你能举个例子来说明你的建议吗?我还是 Spring Integration 的新手。
  • FileSplitter-&gt;executorChannel-&gt;openCSV-&gt;enricher-&gt;openCSV-&gt;fileOutboundAdapter。如果您想保留记录顺序,您需要添加resequencer。如果您需要我更具体,您需要显示您当前的 Spring Integration 配置。
  • 将 XML 添加到原始帖子中。我还在 XML 中的每个组件上添加了 cmets,这样您就可以了解我在 Java 端和 Spring XML 端所做的工作。如果我需要提供其他任何东西,请告诉我?
  • 谢谢 Gary - 您使用带有时间单位的 poll 的想法对我有用。那是唯一缺少的部分。我现在可以优雅地退出它。我的第二个数据被覆盖问题通过引入另一个队列来解决,该队列仅负责编写 CSV 的工作。
【解决方案2】:

在 AccountProducer 中

catch (InterruptedException | IOException ex) {
  System.out.println(ex);
 } 

这不是处理 InterruptedException 的正确方法。 ExecutorService 使用中断来强制关闭(shutDownNow()),但由于您吃掉了中断,ExecutorService 将不会响应强制关闭。

在 AccountConsumer 中

catch (InterruptedException e) {
 Thread.currentThread().interrupt();
}

这确保线程将重新引发可以重新设计为的 InterruptedException

try {
        while (true) {
            Account account = blockingQueue.poll();
            account = accountService.populateAccount(account);
            accounts.add(account);
            if(Thread.currentThread().isInterrupted()) {
                System.out.println("Thread interrupted and hence exiting...");
                break;
            }
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    } catch (Exception ex) {
        System.out.println(ex);
    }

编辑 此外 ExecutorService 调用 shutdown() 不会导致立即销毁

使用 awaitTermination() 方法关闭 ExecutorService 的一种好方法

【讨论】:

    猜你喜欢
    • 2018-11-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-04-30
    • 2019-12-31
    • 1970-01-01
    相关资源
    最近更新 更多