【发布时间】:2015-12-14 08:20:43
【问题描述】:
我应该使用哪个 Java 同步对象来确保完成任意数量的任务?限制是:
- 每项任务都需要很长时间才能完成,因此适合并行执行任务。
- 有太多任务无法放入内存(即,我不能将每个任务的
Future放入Collection中,然后在所有期货上调用get)。 - 我不知道会有多少任务(即我不能使用
CountDownLatch)。 -
ExecutorService可以共享,所以我不能使用awaitTermination( long, TimeUnit )
例如,对于 Grand Central Dispatch,我可能会这样做:
let workQueue = dispatch_get_global_queue( QOS_CLASS_BACKGROUND, 0 )
let latch = dispatch_group_create()
let startTime = NSDate()
var itemsProcessed = 0
let countUpdateQueue = dispatch_queue_create( "countUpdateQueue", DISPATCH_QUEUE_SERIAL )
for item in fetchItems() // generator returns too many items to store in memory
{
dispatch_group_enter( latch )
dispatch_async( workQueue )
{
self.processItem( item ) // method takes a non-trivial amount of time to run
dispatch_async( countUpdateQueue )
{
itemsProcessed++
}
dispatch_group_leave( latch )
}
}
dispatch_group_wait( latch, DISPATCH_TIME_FOREVER )
let endTime = NSDate()
let totalTime = endTime.timeIntervalSinceDate( startTime )
print( "Processed \(itemsProcessed) items in \(totalTime) seconds." )
它产生如下所示的输出(128 个项目):Processed 128 items in 1.846794962883 seconds.
我用Phaser尝试了类似的东西:
final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>( 8 ), new CallerRunsPolicy() );
final Phaser latch = new Phaser( 0 );
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger( 0 );
for( final String item : fetchItems() ) // iterator returns too many items to store in memory
{
latch.register();
final Runnable task = new Runnable() {
public void run() {
processItem( item ); // method takes a non-trivial amount of time to run
itemsProcessed.incrementAndGet();
latch.arrive();
}
};
executor.execute( task );
}
latch.awaitAdvance( 0 );
final long endTime = currentTimeMillis();
out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );
这些任务并不总是在最后一个打印语句之前完成,我可能会得到如下所示的输出(对于 128 个项目):Processed 121 items in 5.296 seconds. Phaser 甚至是正确使用的对象吗?文档表明它仅支持 65,535 方,因此我需要批量处理要处理的项目或引入某种 Phaser 分层。
【问题讨论】:
-
当您说有太多任务无法放入内存时,这不是资源限制吗?因此,您可以并行执行适合内存的任务,并且可以使用 Future 来完成它。不确定是否有任何方法有助于通过资源限制,最终您将得到一个非常复杂的解决方案。
标签: java multithreading concurrency synchronization blocking