【问题标题】:Java program runtime unchanged when adding more threads and produces different results添加更多线程并产生不同结果时,Java 程序运行时不变
【发布时间】:2017-08-16 16:00:45
【问题描述】:

这是我的 ConcurrentApp 类的代码,这是我的问题的根源:

class Processor implements Runnable {

    private int id;
    private Integer interaction;
    private Set<Integer> subset;
    private Set<Integer> y;
    private Object lock = new Object();

    public DCP<BSN> dcp;



    public Processor(int id, Integer interaction, Set<Integer> subset, DCP<BSN> dcp, Set<Integer> y) {
        this.id = id;
        this.interaction = interaction;
        this.subset= subset;
        this.dcp = dcp;
        this.y = y;
    }

    public void run() {
        //System.out.println("Starting: " + this.id);
        if (this.y.contains(this.interaction)){
            this.subset.add(this.interaction);
            processRemoval(this.subset);
        }

        //System.out.println("Completed: " + this.id);
    }

    public void processRemoval(Set<Integer> collection){
        synchronized(Processor.lock) {
            for (Iterator<Integer> iter = this.y.iterator(); iter.hasNext(); ) {
                int element = iter.next();
                while(element != this.interaction){
                    element = iter.next();
                }
                this.dcp.increaseScore(collection);
                if (!collection.contains(this.interaction)) {
                    System.out.println(element + " WAS REMOVED by thread " + this.id);
                    iter.remove();
                }
            }
        }
    }

}


public class ConcurrentApp {

    public void multiRP (DCP<BSN> dcp, int threads) {

        ConcurrentHashMap<Integer,Boolean> x = new ConcurrentHashMap<Integer,Boolean>();
        ConcurrentHashMap<Integer,Boolean> z = new ConcurrentHashMap<Integer,Boolean>();
        Set<Integer> y = (Set<Integer>) Collections.newSetFromMap(x);
        y.addAll(dcp.PA);
        Set<Integer> zeta = (Set<Integer>) Collections.newSetFromMap(z);
        ExecutorService executor = Executors.newFixedThreadPool(threads);

        int i =1;
        while ((y.size() > i) && (i <= dcp.R)){
            for (Iterator<Integer> iterator = y.iterator(); iterator.hasNext();){
                zeta.addAll(y);
                Integer interaction = iterator.next();
                zeta.remove(interaction);
                ArrayList<Set<Integer>> subsets = dcp.getSubsets(zeta, i);
                for (int j = 0; j< subsets.size(); j++){
                    executor.submit(new Processor(j, interaction, subsets.get(j), dcp, y)); 
                }
            }
            i++;
        }
        executor.shutdown();
        System.out.println("All tasks submitted");
        try {
            executor.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(y);
        dcp.PA = new ArrayList<Integer>(y);
        System.out.println("All tasks completed");
    }
}

以及包含一些辅助函数的类 DCP 的相关代码在这里:

public class DCP<E> extends CondInd<E> {
    public final int R;             //assumed maximum # of nodes blocking any node from target
    public ArrayList <Integer> PA;  //set of all parents of the target node (should be static)
    //public Node NULL;     //NULL is model with no parents of target (should be final static)
    public E scoreType = null;
    public ScoringFunction<? super E> scoringFunction;

    public double calcScore(E sT, Set<Integer> parentIndices) {
        ArrayList<Integer> list = new ArrayList<Integer>(parentIndices);
        return this.scoringFunction.score(sT, list);
    }

    public double calcScore(E sT, ArrayList<Integer> parentIndices) {
        return this.scoringFunction.score(sT, parentIndices);
    }


    //helper for actual subsets method
    private void getSubsets(ArrayList<Integer> input, int length, int start_index, Set<Integer> acc, ArrayList<Set<Integer>> sol){
        //base case
        if (acc.size() == length){
            sol.add(new HashSet<>(acc));
            return;
        }
        //recursive solution
        if (start_index == input.size()) return;
        int x = input.get(start_index);
        acc.add(x);
        //first recursion
        getSubsets(input, length, start_index+1, acc, sol);
        acc.remove(x);
        //second recursion, after x removed
        getSubsets(input, length, start_index+1, acc, sol);
    }

    //different arguments and returns a list of subsets
    public ArrayList<Set<Integer>> getSubsets(ArrayList<Integer> input, int length){
        ArrayList<Set<Integer>> sol = new ArrayList<>();
        getSubsets(input, length, 0, new HashSet<Integer>(), sol);
        return sol;
    }

    //different arguments and returns a list of subsets
    public ArrayList<Set<Integer>> getSubsets(Set<Integer> input, int length){
        ArrayList<Set<Integer>> sol = new ArrayList<>();
        ArrayList<Integer> copy = new ArrayList<Integer>(input);
        getSubsets(copy, length, 0, new HashSet<Integer>(), sol);
        return sol;
    }


    //removes the element from the input that increases the score by the highest value
    public void increaseScore(Set<Integer> input){
        int index = -1;
        double score = calcScore(scoreType,input);
        List<Integer> list = new ArrayList<Integer>(input);
        for (Integer element : list) {
            ArrayList<Integer> copy_list = new ArrayList<Integer>(list);
            copy_list.remove(element);
            if (calcScore(scoreType,copy_list) > score){
                index = list.indexOf(element);
                score = calcScore(scoreType,copy_list);
            }
        }
        if (index != -1)
            input.remove(list.get(index));
    }

    public DCP(int maximumNodes, E scoreType, ScoringFunction<? super E> scoringFunction, ArrayList<Integer> parents){
        this.R = maximumNodes;
        this.scoreType = scoreType;
        this.scoringFunction = scoringFunction;
        this.PA = parents;
    }
}

当我在线程 = 1 的 ConcurrentApp 中运行我的代码时,我会根据我的打印语句在控制台中收到以下打印输出:

All tasks submitted
0 WAS REMOVED by thread 1
1 WAS REMOVED by thread 0
2 WAS REMOVED by thread 0
3 WAS REMOVED by thread 0
4 WAS REMOVED by thread 1
5 WAS REMOVED by thread 0
6 WAS REMOVED by thread 0
[7, 8]
All tasks completed
Program completed in :22 seconds

其中第一个数字对应于从我的列表中删除的整数(例如,“0 WAS REMOVED by thread 2”表示值 0 已从主列表 y 中删除)。这个输出是有意义的,因为每个需要删除的值都被删除了一次,并给出了预期的结果 [7, 8],这应该是在这种情况下唯一没有被删除的两个值。

但是,当我使用 >1 个线程运行我的代码时,我得到以下输出:

All tasks submitted
0 WAS REMOVED by thread 2
1 WAS REMOVED by thread 1
2 WAS REMOVED by thread 0
2 WAS REMOVED by thread 1
3 WAS REMOVED by thread 1
3 WAS REMOVED by thread 2
4 WAS REMOVED by thread 0
4 WAS REMOVED by thread 1
5 WAS REMOVED by thread 0
5 WAS REMOVED by thread 1
6 WAS REMOVED by thread 0
6 WAS REMOVED by thread 1
7 WAS REMOVED by thread 1
7 WAS REMOVED by thread 2
8 WAS REMOVED by thread 1
8 WAS REMOVED by thread 0
[]
All tasks completed
Program completed in :0 seconds

如您所见,在某些情况下,相同的值被多次删除,因为多个线程决定需要删除该值。另一个问题是,这也通过给我一个 [] 而不是 [7, 8] 来改变我的结果,因为由于某种原因,当我使用多个线程时,程序错误地决定需要从主列表 y 中删除 7 和 8。我通过将 static 添加到锁定字段来解决了多线程删除的问题:

private static Object lock = new Object();

但是,现在我有一个问题,即当我增加线程数时运行时不会改变。添加 static 后 using threads >= 1 的输出如下:

All tasks submitted
0 WAS REMOVED by thread 1
1 WAS REMOVED by thread 1
2 WAS REMOVED by thread 1
3 WAS REMOVED by thread 0
4 WAS REMOVED by thread 1
5 WAS REMOVED by thread 0
6 WAS REMOVED by thread 0
[7, 8]
All tasks completed
Program completed in :22 seconds

线程数并没有提高运行时间,但我得到了正确的结果。无论我使用 1 个线程还是多个线程,这个结果和运行时都是完全相同的。

问题:在我看来,有两种可能的解决方案:

1) 移除锁上的static关键字,并找到一种方法让执行移除的线程告诉其他线程跳过被移除的值

2) 保留 static 关键字并找出为什么我的程序在有更多线程可用时只使用 1 个线程。

非常感谢任何想法!

【问题讨论】:

  • 很难相信所有代码都与问题相关。你能试着把它减少到minimal reproducible example吗?
  • @shmosel 第一个代码块是解决问题所需的主要代码。我之前发布过类似的代码,并被告知还要包含第二个代码块以使事情变得更容易。请关注第一个代码块,但当您需要了解第一个代码块中的一些函数/类时,请参考第二个代码块。我相信这是一个完整的示例,但是,由于运行代码所需的各种包和依赖项,一个可验证的示例将需要更多的代码。

标签: java multithreading concurrency locking java.util.concurrent


【解决方案1】:

(我的朋友,你真的必须学会发布最少的代码)。

我的诊断:您的程序在多线程中的行为相同,因为processRemoval 在处理整个集合之前与其余线程同步,所以难怪为什么该集合只被第一个处理线程。

在这些情况下,通常的方法是在处理每个项目之前同步线程。因此,您似乎应该将 synchronize 移到 循环中。

但是,在这种情况下,您正在修改循环中的集合,这可能会产生ConcurrentModificationException。为避免这种情况,我建议您还将 HashSet 的使用替换为 Set 的另一个并发实现。例如,ConcurrentSkipListSetCopyOnWriteArraySet,请自行选择。

【讨论】:

  • 欣赏输入。我尝试移动 synchronize 关键字,但似乎没有做任何事情(我将它移动到 for 循环中并尝试了 ProcessRemoval 中的不同位置)。我也没有得到任何ConcurrentModificationException,因为我的被多个线程访问的集合都由ConcurrentHashMap的线程安全实现支持
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2011-04-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-02-16
  • 1970-01-01
相关资源
最近更新 更多