【问题标题】:Efficient algorithm to distribute work?分配工作的有效算法?
【发布时间】:2011-02-01 15:01:07
【问题描述】:

解释起来有点复杂,但我们开始吧。基本上,问题是“如何以有效的方式将问题分解为子问题”。这里的“高效”意味着分解的子问题尽可能大。基本上,如果我根本不需要分解问题,那将是理想的。然而,因为一个工人只能处理特定的问题,我确实需要分手。但我想找到一种尽可能粗略的方法。

这是一些伪代码..

我们遇到了这样的问题(抱歉,这是在 Java 中。如果您不明白,我很乐意解释)。

class Problem {
    final Set<Integer> allSectionIds = { 1,2,4,6,7,8,10 };
    final Data data = //Some data
}

还有一个子问题是:

class SubProblem {
    final Set<Integer> targetedSectionIds;
    final Data data;

    SubProblem(Set<Integer> targetedSectionsIds, Data data){
        this.targetedSectionIds = targetedSectionIds;
        this.data = data;
    }
}

那么,工作将如下所示。

class Work implements Runnable {
    final Set<Section> subSections;
    final Data data;
    final Result result;

    Work(Set<Section> subSections, Data data) {
        this.sections = SubSections;
        this.data = data;
    }

    @Override
    public void run(){
        for(Section section : subSections){
            result.addUp(compute(data, section));
        }
    }
}

现在我们有了 'Worker' 的实例,它们有自己的状态 sections I have

class Worker implements ExecutorService {
    final Map<Integer,Section> sectionsIHave;
    {
        sectionsIHave = {1:section1, 5:section5, 8:section8 };
    }

    final ExecutorService executor = //some executor.

    @Override
    public void execute(SubProblem problem){
        Set<Section> sectionsNeeded = fetchSections(problem.targetedSectionIds);
        super.execute(new Work(sectionsNeeded, problem.data);
    }

}

呼。

所以,我们有很多Problems 和Workers 不断要求更多SubProblems。我的任务是将Problems 分解为SubProblem 并将其交给他们。然而,困难在于,我必须稍后收集子问题的所有结果并将它们合并(减少)到整个 ProblemResult 中。

然而,这很昂贵,所以我想给工人提供尽可能大的“块”(尽可能多的targetedSections)。

它不必是完美的(在数学上尽可能高效或其他)。我的意思是,我想不可能有一个完美的解决方案,因为你无法预测每次计算需要多长时间等等。但是有一个很好的启发式解决方案吗?或者也许我可以在开始设计之前阅读一些资源?

非常感谢任何建议!

编辑: 我们也可以控制部分分配,因此控制它是另一种选择。基本上,唯一的限制是一个工人只能有固定数量的部分。

【问题讨论】:

  • 我真的不知道它是否适用,因为我对它的了解还不够,但 Fork/Join 似乎是一种用于执行此操作的算法。 ibm.com/developerworks/java/library/j-jtp11137.html
  • 谢谢。我正试图绕过那个。但问题是,即使我使用了那个框架,我仍然必须提供分割任务等的逻辑。所以我仍然会遇到这个问题..
  • 分布式计算无疑是一项不平凡的任务,也是高性能计算 (HPC) 研究的活跃领域。一本不错的本科课本是 Michael Quinn 的“使用 MPI 和 OpenMP 进行并行编程”,McGraw-Hill ISBM 0-07-282256-2
  • Section->Worker 分配背后有什么原因吗?部分是否带有只能在单个线程中访问的可变状态?
  • 其实它们不是线程,而是不同的机器。所以我们通过网络发送子问题。它确实是一种只能在运行时知道的可变状态,并且只能被某些机器访问(虽然不一定是单一的)。对不起,我应该提到这一点..

标签: java distributed mapreduce concurrent-programming


【解决方案1】:

好的,您的网络服务似乎有一个分片模型,我们做了类似的事情,我们使用“entityId”(sectionId)的反向索引到将连接到特定网络的“client”(worker)将处理该特定实体的服务。最简单的方法(见下文)是使用 id 到 worker 的反向映射。如果内存是一个约束,另一种可能性是使用一个函数(例如 sectionId % 服务数)。

为了让服务尽可能多地工作,有一个简单的批处理算法,可以将批处理填充到某个用户指定的最大值。这将允许根据远程服务使用它们的速度来粗略地调整工作块的大小。

public class Worker implements Runnable {

    private final Map<Integer, Section> sections;
    private final BlockingQueue<SubProblem> problemQ = new ArrayBlockingQueue<SubProblem>(4096);
    private final int batchSize;

    public Worker(final Map<Integer, Section> sectionsIHave, final int batchSize) {
        this.sections = sectionsIHave;
        this.batchSize = batchSize;
    }

    public Set<Integer> getSectionIds() {
        return sections.keySet();
    }

    public void execute(final SubProblem command) throws InterruptedException {

        if (sections.containsKey(command.getSectionId())) {
            problemQ.put(command);
        } else {
            throw new IllegalArgumentException("Invalid section id for worker: " + command.getSectionId());
        }

    }

    @Override
    public void run() {
        final List<SubProblem> batch = new ArrayList<SubProblem>(batchSize);
        while (!Thread.interrupted()) {
            batch.clear();

            try {
                batch.add(problemQ.take());
                for (int i = 1; i < batchSize; i++) {
                    final SubProblem problem = problemQ.poll();
                    if (problem != null) {
                        batch.add(problem);
                    } else {
                        break;
                    }

                    process(batch);
                }
            } catch (final InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void process(final List<SubProblem> batch) {
        // Submit to remote process.
    }

    private static Map<Integer, Worker> indexWorkers(final List<Worker> workers) {
        final Map<Integer, Worker> temp = new HashMap<Integer, Worker>();
        for (final Worker worker : workers) {
            for (final Integer sectionId : worker.getSectionIds()) {
                temp.put(sectionId, worker);
            }
        }
        return Collections.unmodifiableMap(temp);
    }

    public static void main(final String[] args) throws InterruptedException {
     // Load workers, where worker is bound to single remote service
        final List<Worker> workers = getWorkers();
        final Map<Integer, Worker> workerReverseIndex = indexWorkers(workers);
        final List<SubProblem> subProblems = getSubProblems();
        for (final SubProblem problem : subProblems) {
            final Worker w = workerReverseIndex.get(problem.getSectionId());
            w.execute(problem);
        }
    }
}

【讨论】:

  • 谢谢!但是你知道,问题是我们的threads 有状态(实际上它们是不同的物理机器)。如果我只是在中间切开部分,那么向左或向右执行的线程可能没有那个“部分”......
猜你喜欢
  • 2012-02-16
  • 1970-01-01
  • 2020-11-18
  • 1970-01-01
  • 2014-03-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多