【问题标题】:Synchronisation object to ensure all tasks are completed同步对象,确保所有任务完成
【发布时间】:2015-12-14 08:20:43
【问题描述】:

我应该使用哪个 Java 同步对象来确保完成任意数量的任务?限制是:

  1. 每项任务都需要很长时间才能完成,因此适合并行执行任务。
  2. 有太多任务无法放入内存(即,我不能将每个任务的 Future 放入 Collection 中,然后在所有期货上调用 get)。
  3. 我不知道会有多少任务(即我不能使用CountDownLatch)。
  4. ExecutorService 可以共享,所以我不能使用awaitTermination( long, TimeUnit )

例如,对于 Grand Central Dispatch,我可能会这样做:

let workQueue = dispatch_get_global_queue( QOS_CLASS_BACKGROUND, 0 )
let latch = dispatch_group_create()
let startTime = NSDate()
var itemsProcessed = 0
let countUpdateQueue = dispatch_queue_create( "countUpdateQueue", DISPATCH_QUEUE_SERIAL )
for item in fetchItems() // generator returns too many items to store in memory
{
    dispatch_group_enter( latch )
    dispatch_async( workQueue )
    {
        self.processItem( item ) // method takes a non-trivial amount of time to run
        dispatch_async( countUpdateQueue )
        {
            itemsProcessed++
        }
        dispatch_group_leave( latch )
    }
}
dispatch_group_wait( latch, DISPATCH_TIME_FOREVER )
let endTime = NSDate()
let totalTime = endTime.timeIntervalSinceDate( startTime )
print( "Processed \(itemsProcessed) items in \(totalTime) seconds." )

它产生如下所示的输出(128 个项目):Processed 128 items in 1.846794962883 seconds.

我用Phaser尝试了类似的东西:

final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>( 8 ), new CallerRunsPolicy() );
final Phaser latch = new Phaser( 0 );
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger( 0 );
for( final String item : fetchItems() ) // iterator returns too many items to store in memory
{
    latch.register();
    final Runnable task = new Runnable() {
        public void run() {
            processItem( item ); // method takes a non-trivial amount of time to run
            itemsProcessed.incrementAndGet();
            latch.arrive();
        }
    };
    executor.execute( task );
}
latch.awaitAdvance( 0 );
final long endTime = currentTimeMillis();
out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );

这些任务并不总是在最后一个打印语句之前完成,我可能会得到如下所示的输出(对于 128 个项目):Processed 121 items in 5.296 seconds. Phaser 甚至是正确使用的对象吗?文档表明它仅支持 65,535 方,因此我需要批量处理要处理的项目或引入某种 Phaser 分层。

【问题讨论】:

  • 当您说有太多任务无法放入内存时,这不是资源限制吗?因此,您可以并行执行适合内存的任务,并且可以使用 Future 来完成它。不确定是否有任何方法有助于通过资源限制,最终您将得到一个非常复杂的解决方案。

标签: java multithreading concurrency synchronization blocking


【解决方案1】:

本例中Phaser 用法的问题在于CallerRunsPolicy 允许任务在启动线程上执行。因此,当循环仍在进行时,到达方的数量可能等于注册方的数量,从而导致阶段增加。解决方案是用 1 方初始化 Phaser 然后,当循环完成时,到达并等待其他方到达。这可确保在所有任务完成之前阶段不会递增到 1。

final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>( 8 ), new CallerRunsPolicy() );
final Phaser latch = new Phaser( 1 );
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger( 0 );
for( final String item : fetchItems() ) // iterator returns too many items to store in memory
{
    latch.register();
    final Runnable task = new Runnable() {
        public void run() {
            processItem( item ); // method takes a non-trivial amount of time to run
            itemsProcessed.incrementAndGet();
            final int arrivalPhase = latch.arrive();
        }
    };
    executor.execute( task );
}
latch.arriveAndAwaitAdvance();
final long endTime = currentTimeMillis();
out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );

【讨论】:

    【解决方案2】:

    “确保完成任意数量的任务” - 最简单的方法是维护已完成任务的计数器,并使用阻塞操作等待达到给定数量的任务。没有这样的现成类,但很容易制作:

    class EventCounter {
       long counter=0;
    
       synchronized void up () {
         counter++;
         notifyAll();
       }
       synchronized void ensure (long count) {
         while (counter<count) wait();
       }
     }
    

    “有太多任务无法放入内存”——所以当正在运行的任务数量过多时,必须暂停提交新任务的进程。最简单的方法是将运行任务的数量视为一种资源,并用信号量来计算:

    Semaphore runningTasksSema=new Semaphore(maxNumberOfRunningTasks);
    EventCounter  eventCounter =new EventCounter ();
    
    for( final String item : fetchItems() ) {
        final Runnable task = new Runnable() {
           public void run() {
                processItem( item ); 
                runningTasksSema.release();
                eventCounter.up();
           }
        };
       runningTasksSema.aquire();
       executor.execute(task);
    }
    

    当一个线程想要确保完成给定数量的任务时,它会调用:

    eventCounter.ensure(givenNumberOfFinishedTasks);
    

    可以设计异步(非阻塞)版本的runningTasksSema.aquire()eventCounter.ensure() 操作,但它们会更复杂。

    【讨论】:

      【解决方案3】:

      如果你使用的是 java8,你可以使用 CompletableFuture

      java.util.concurrent.CompletableFuture.allOf(CompletableFuture<?>... cfs)
      

      这将等待传递数组中所有期货的结果。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2016-01-15
        • 1970-01-01
        • 2012-04-15
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-04-14
        相关资源
        最近更新 更多