【问题标题】:Executor framework to process 1 million records处理 100 万条记录的 Executor 框架
【发布时间】:2020-01-24 13:18:18
【问题描述】:

我有一个要求,我必须处理一个包含 100 万条记录的文件并将其保存在 redis 缓存中。我应该使用redis管道,但我没有得到任何信息。这是我的问题:Question

所以我决定使用多线程执行器框架。我是多线程新手 这是我的代码:

@Async
    public void createSubscribersAsync(Subscription subscription, MultipartFile file)throws EntityNotFoundException, InterruptedException, ExecutionException, TimeoutException {

        ExecutorService executorService = Executors.newFixedThreadPool(8);
        Collection<Callable<String>> callables = new ArrayList<>();


        List<Subscriber> cache = new ArrayList<>();
        int batchSize = defaultBatchSize.intValue();

        while ((line = br.readLine()) != null) {
            try {
                Subscriber subscriber = createSubscriber(subscription, line);
                cache.add(subscriber);
                if (cache.size() >= batchSize) {
                    IntStream.rangeClosed(1, 8).forEach(i -> {
                    callables.add(createCallable(cache, subscription.getSubscriptionId()));});
                }
            } catch (InvalidSubscriberDataException e) {
                invalidRows.add(line + ":" + e.getMessage());
                invalidCount++;
            }
        }
        List<Future<String>> taskFutureList = executorService.invokeAll(callables);
        for (Future<String> future : taskFutureList) {
            String value = future.get(4, TimeUnit.SECONDS);
            System.out.println(String.format("TaskFuture returned value %s", value));
        }
    }

    private Callable<String> createCallable(List<Subscriber> cache, String subscriptionId) {

        return new Callable<String>() {

            public String call() throws Exception {

                System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
                processSubscribers(cache,subscriptionId);
                System.out.println(String.format("finished expensive task thread %s", Thread.currentThread().getName()));
                return "Finish Thread:" + Thread.currentThread().getName();
            }
        };
    }

    private void processSubscribers(List<Subscriber> cache, String subscriptionId) {
        subscriberRedisRepository.saveAll(cache);
        cache.clear();
    }

这里的想法是我想批量拆分文件并使用线程保存该批次。我创建了 8 个线程池。

这是实现执行器框架的正确方法吗?如果不能,你能帮我解决这个问题吗?感谢您的帮助。

【问题讨论】:

  • "这是实现执行器框架的正确方法吗?"你告诉我们。它有效吗?如果是,那就对了。
  • 这不会像你想要的那样工作,它在修改列表时将相同的列表传递给 8 个可调用对象。不确定 batchSize 的目的是什么,你的缓存一旦达到 batchSize,就永远不会缩小,添加另一个订阅者会创建另一个批处理。
  • @Firefly 哎呀.. 8 个可调用对象的列表相同?那不是我想要的。实际上我想要的是并行处理 100 万条记录文件。这里的批次是 1k。我希望每个线程都处理新的 1k 记录以提高性能。我应该如何修改我的代码来实现这一点?
  • 有什么特别的理由选择 8 作为线程池的大小吗?这似乎是一个奇怪的具体数字。
  • @Martin'sRun No 这样的。我的主要意图是所有线程都应该划分它们的工作并且应该提高性能。目前处理 100 万条记录需要一个多小时。

标签: java multithreading file executorservice


【解决方案1】:

快速修改您当前的代码以完成请求:

一旦当前缓存超过批量大小,在您的 while 循环中,创建一个可调用的传入当前缓存。 重置缓存列表,创建一个新列表并将其分配为缓存。

您正在创建一个可调用对象列表以批量提交它们,为什么不在创建它们之后立即提交您的可调用对象?这将开始将已读取的记录写入 redis,而您的主线程继续从文件中读取。

 List<Future<String>> taskFutureList = new LinkedList<Future<String>>();
 while ((line = br.readLine()) != null) {
    try {
        Subscriber subscriber = createSubscriber(subscription, line);
        cache.add(subscriber);
        if (cache.size() >= batchSize) {
                    taskFutureList.add(executorService.submit(createCallable(cache,subscription.getSubscriptionId())));
            List<Subscriber> cache = new ArrayList<>();
        }
     } catch (InvalidSubscriberDataException e) {
        invalidRows.add(line + ":" + e.getMessage());
        invalidCount++;
    }
}
//submit last batch that could be < batchSize
if(!cache.isEmpty()){ 
           taskFutureList.add(executorService.submit(createCallable(cache,subscription.getSubscriptionId())));
}

您不必存储单独的可调用对象列表。

【讨论】:

  • 我运行了这个。我发现一旦所有线程(8 个线程)都完成了第一批的工作,它们就不会重复。我的意思是,如果线程完成了工作,它应该需要下一批 1000 个订阅者,并且它应该一直持续到所有 100 万个订阅者都被处理完为止,而这现在不会发生
  • @pan1490 您是否像在原始代码中一样对 taskFutureList 中的所有期货执行“future.get()”?
  • 是的,对于所有调用“future.get()”的期货
  • 如果打算等待线程完成运行,你应该删除future.get超时,你能把它改成future.get()而不是future.get(4,TimeUnit.SECONDS)吗?除非您不想等待线程完成,否则就是这样。
  • 当然。我试试看
猜你喜欢
  • 2017-08-30
  • 1970-01-01
  • 1970-01-01
  • 2022-06-10
  • 2017-01-31
  • 1970-01-01
  • 2011-03-10
  • 1970-01-01
  • 2019-06-19
相关资源
最近更新 更多