【发布时间】:2020-01-24 13:18:18
【问题描述】:
我有一个要求,我必须处理一个包含 100 万条记录的文件并将其保存在 redis 缓存中。我应该使用redis管道,但我没有得到任何信息。这是我的问题:Question
所以我决定使用多线程执行器框架。我是多线程新手 这是我的代码:
@Async
public void createSubscribersAsync(Subscription subscription, MultipartFile file)throws EntityNotFoundException, InterruptedException, ExecutionException, TimeoutException {
ExecutorService executorService = Executors.newFixedThreadPool(8);
Collection<Callable<String>> callables = new ArrayList<>();
List<Subscriber> cache = new ArrayList<>();
int batchSize = defaultBatchSize.intValue();
while ((line = br.readLine()) != null) {
try {
Subscriber subscriber = createSubscriber(subscription, line);
cache.add(subscriber);
if (cache.size() >= batchSize) {
IntStream.rangeClosed(1, 8).forEach(i -> {
callables.add(createCallable(cache, subscription.getSubscriptionId()));});
}
} catch (InvalidSubscriberDataException e) {
invalidRows.add(line + ":" + e.getMessage());
invalidCount++;
}
}
List<Future<String>> taskFutureList = executorService.invokeAll(callables);
for (Future<String> future : taskFutureList) {
String value = future.get(4, TimeUnit.SECONDS);
System.out.println(String.format("TaskFuture returned value %s", value));
}
}
private Callable<String> createCallable(List<Subscriber> cache, String subscriptionId) {
return new Callable<String>() {
public String call() throws Exception {
System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
processSubscribers(cache,subscriptionId);
System.out.println(String.format("finished expensive task thread %s", Thread.currentThread().getName()));
return "Finish Thread:" + Thread.currentThread().getName();
}
};
}
private void processSubscribers(List<Subscriber> cache, String subscriptionId) {
subscriberRedisRepository.saveAll(cache);
cache.clear();
}
这里的想法是我想批量拆分文件并使用线程保存该批次。我创建了 8 个线程池。
这是实现执行器框架的正确方法吗?如果不能,你能帮我解决这个问题吗?感谢您的帮助。
【问题讨论】:
-
"这是实现执行器框架的正确方法吗?"你告诉我们。它有效吗?如果是,那就对了。
-
这不会像你想要的那样工作,它在修改列表时将相同的列表传递给 8 个可调用对象。不确定 batchSize 的目的是什么,你的缓存一旦达到 batchSize,就永远不会缩小,添加另一个订阅者会创建另一个批处理。
-
@Firefly 哎呀.. 8 个可调用对象的列表相同?那不是我想要的。实际上我想要的是并行处理 100 万条记录文件。这里的批次是 1k。我希望每个线程都处理新的 1k 记录以提高性能。我应该如何修改我的代码来实现这一点?
-
有什么特别的理由选择 8 作为线程池的大小吗?这似乎是一个奇怪的具体数字。
-
@Martin'sRun No 这样的。我的主要意图是所有线程都应该划分它们的工作并且应该提高性能。目前处理 100 万条记录需要一个多小时。
标签: java multithreading file executorservice