【问题标题】:Divide elements of an array and process parallelly in chunks using multiple threads使用多个线程划分数组元素并以块的形式并行处理
【发布时间】:2020-06-07 07:20:32
【问题描述】:

我有两个数组,一个有大约 2000 条记录,另一个只有 6 条记录(包含访问令牌)。我想将大数组分成 100 个块,并从其他数组分配一个访问令牌来处理这 100 条记录,并继续这样做,直到处理完所有 2000 条记录。在最后一个访问令牌映射到 100 条记录后,下一组 100 条记录应该再次映射到第一个令牌(我实现了一个循环迭代器以继续从令牌列表中获取令牌)。我确实尝试通过创建(bigarray.length / 100)的线程池来通过执行程序服务来实现它。但是看起来我的多线程逻辑做错了,因为我能够处理所有的 id 并打印它,但是在使用 spring jpa 系统保存在数据库中时,系统崩溃、挂起并给出内存错误:

Out of Memory error
Java heap space
HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=52s882ms437µs947ns).
2020-06-07 13:02:04.195  WARN 8214 --- [ool-1-thread-18] o.h.engine.jdbc.spi.SqlExceptionHelper   : SQL Error: 0, SQLState: null
2020-06-07 13:02:04.196  WARN 8214 --- [ool-1-thread-10] o.h.engine.jdbc.spi.SqlExceptionHelper   : SQL Error: 0, SQLState: null

================================================ =============================

private void processIds(MyService service, long[] ids, List<Tokens> tokens) {

        int threadsCount = (int)ids.length / 100;
        ExecutorService executorService = Executors.newFixedThreadPool(threadsCount);
        RoundRobinUtil<Tokens> tokensIterator = new RoundRobinUtil<Tokens>();
        tokensIterator.setInputList(tokens);

        int k = 0;
        int j = 0;
        while(k <= ids.length){
            long[] newIds = new long[100];


            int iterationLength = (ids.length - k) < 100 ? (ids.length - k) : 100;
            for(int i = 0; i<iterationLength; i++, j++){
                newIds[i] = ids[j];  //fetch 100 elements from big array and create a new array //of 100 elements
            }

            executorService.execute(new MyThread(newIds, service, repo, tokensIterator.iterator().next()));   // assigning each 100 elements of the big array to a token //for processing in an independent thread 

            k = k + iterationLength;
       }
    executorService.shutdown();
}

@Data
@NoArgsConstructor
@AllArgsConstructor
Class MyThread extends Thread {
      private long[] ids;
      private Service service;
      private Repository repo;
      private Token token;

      @Override
      public void run() {
        //process all the 100 ids of array with a token
         UserDetails entity = new UserDetails;
         ResponseList<User> details = service.fetchDetails(ids);
         for(User u : details) {
             entity.setName(u.getName);
             repo.save(entity);
         } 
        //save details of 100 ids to database 
      }

}

让我知道该方法是否可行,或者请提出一些更好的方法来处理这个问题。 谢谢!

【问题讨论】:

  • 请参阅下面的 #4 进行修复。

标签: java multithreading spring-boot parallel-processing executorservice


【解决方案1】:

1) 等待 executorService 关闭被认为是一种很好的做法,否则它可能会在后台运行:请参阅 awaitTermination https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#awaitTermination(long,%20java.util.concurrent.TimeUnit)

2) 我不会让像 i、k 和 j 这样的变量名通过我的代码审查;-)

3) 请使用 System.arraycopy(array, 0, n, 0, n) 代替 for 循环。它的速度要快得多。

4) 我会用更合理的启发式替换“threadCount”变量。如果您传递一个很长的 id 列表,这可能会产生不需要的结果。

我发现这在我们的系统中最有效:

Runtime runtime = Runtime.getRuntime();
ExecutorService executor = Executors.newFixedThreadPool(runtime.availableProcessors());

【讨论】:

  • 谢谢,这有助于有效地使用系统资源,从而不会让我的系统挂起。
【解决方案2】:

我试图优化我的代码的几件事,现在它获取结果的速度非常快,系统也没有卡住:

  1. 在 100 个元素的列表中转换了 100 个元素的数组,并在哈希图中分配每个列表。 ArrayList 的性能总是比 Arrays 好。

idList = Arrays.stream(ids.getIDs()).boxed().collect(Collectors.toList()); listMap.put(listMap.size()+1, idList);

  1. 更新了 processIds 方法并在并行 Stream API 的帮助下在其中添加了一些并行处理:

     userListMap.entrySet().parallelStream().forEach(entry -> {
         log.info("now inside map : key "+entry.getKey()+" -- value size :"+entry.getValue().size());
         List<List<Long>> partition = Lists.partition(entry.getValue(), 100);
         partition.stream().parallel().forEach(list -> {
             log.info("now inside list of size:"+ list.size());
             executorService.submit(new MyThread(list.stream().mapToLong(l -> l).toArray(), service, repo,
                     tokens.iterator().next()));
         });
     });
    
    
     log.info("now shutting down  executor service");
     executorService.shutdown();
    
     log.info("*****waiting for task to be completed*****");
     System.out.println("*****waiting for task to be completed*****");
     try {
         executorService.awaitTermination(15, TimeUnit.MINUTES);
     } catch (InterruptedException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
     }
    
  2. 还更新了 MyThread 实现并使用 saveAll 而不是 save inside run 方法:

    repo.saveAll(entities);

================================================ ==========================

所以使用 ArrayLists 而不是 Arrays ,并行处理 hashmaps 和 arraylist 并使用 saveAll 批量保存所有实体是一些有助于优化的技巧。

谢谢!

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-05-20
    • 1970-01-01
    • 2015-12-20
    • 1970-01-01
    • 1970-01-01
    • 2011-02-10
    • 1970-01-01
    相关资源
    最近更新 更多