【发布时间】:2009-12-19 06:45:09
【问题描述】:
我现在有几次相同的需求,并想就构建解决方案的正确方法获得其他想法。需要在许多线程上对许多元素执行一些操作,而不需要一次将所有元素都放在内存中,只需要计算中的元素。例如,Iterables.partition 是不够的,因为它会将所有元素预先放入内存中。
在代码中表达它,我想编写一个 BulkCalc2,它与 BulkCalc1 做同样的事情,只是并行。下面是示例代码,说明了我的最佳尝试。我不满意,因为它又大又丑,但它似乎确实实现了我的目标,即在工作完成之前保持线程的高度利用率,propagating 计算期间的任何异常,并且不超过 numThreads BigThing 的实例必须立即在内存中。
我会以最简洁的方式接受符合既定目标的答案,无论是改进我的 BulkCalc2 的方法还是完全不同的解决方案。
interface BigThing {
int getId();
String getString();
}
class Calc {
// somewhat expensive computation
double calc(BigThing bigThing) {
Random r = new Random(bigThing.getString().hashCode());
double d = 0;
for (int i = 0; i < 100000; i++) {
d += r.nextDouble();
}
return d;
}
}
class BulkCalc1 {
final Calc calc;
public BulkCalc1(Calc calc) {
this.calc = calc;
}
public TreeMap<Integer, Double> calc(Iterator<BigThing> in) {
TreeMap<Integer, Double> results = Maps.newTreeMap();
while (in.hasNext()) {
BigThing o = in.next();
results.put(o.getId(), calc.calc(o));
}
return results;
}
}
class SafeIterator<T> {
final Iterator<T> in;
SafeIterator(Iterator<T> in) {
this.in = in;
}
synchronized T nextOrNull() {
if (in.hasNext()) {
return in.next();
}
return null;
}
}
class BulkCalc2 {
final Calc calc;
final int numThreads;
public BulkCalc2(Calc calc, int numThreads) {
this.calc = calc;
this.numThreads = numThreads;
}
public TreeMap<Integer, Double> calc(Iterator<BigThing> in) {
ExecutorService e = Executors.newFixedThreadPool(numThreads);
List<Future<?>> futures = Lists.newLinkedList();
final Map<Integer, Double> results = new MapMaker().concurrencyLevel(numThreads).makeMap();
final SafeIterator<BigThing> it = new SafeIterator<BigThing>(in);
for (int i = 0; i < numThreads; i++) {
futures.add(e.submit(new Runnable() {
@Override
public void run() {
while (true) {
BigThing o = it.nextOrNull();
if (o == null) {
return;
}
results.put(o.getId(), calc.calc(o));
}
}
}));
}
e.shutdown();
for (Future<?> future : futures) {
try {
future.get();
} catch (InterruptedException ex) {
// swallowing is OK
} catch (ExecutionException ex) {
throw Throwables.propagate(ex.getCause());
}
}
return new TreeMap<Integer, Double>(results);
}
}
【问题讨论】:
-
我要告诉你一件事:TreeMap 在大多数用途中是 Java 集合中最慢的,应该只要尽可能地替换为其他集合之一。如果我能找到时间,我会尝试解决这个问题——它很复杂,我也遇到过类似的问题,所以你有我的同情。良好的并行编码很难,即使在 Java 中也是如此。
标签: java iterator parallel-processing