【发布时间】:2017-08-16 16:00:45
【问题描述】:
这是我的 ConcurrentApp 类的代码,这是我的问题的根源:
class Processor implements Runnable {
private int id;
private Integer interaction;
private Set<Integer> subset;
private Set<Integer> y;
private Object lock = new Object();
public DCP<BSN> dcp;
public Processor(int id, Integer interaction, Set<Integer> subset, DCP<BSN> dcp, Set<Integer> y) {
this.id = id;
this.interaction = interaction;
this.subset= subset;
this.dcp = dcp;
this.y = y;
}
public void run() {
//System.out.println("Starting: " + this.id);
if (this.y.contains(this.interaction)){
this.subset.add(this.interaction);
processRemoval(this.subset);
}
//System.out.println("Completed: " + this.id);
}
public void processRemoval(Set<Integer> collection){
synchronized(Processor.lock) {
for (Iterator<Integer> iter = this.y.iterator(); iter.hasNext(); ) {
int element = iter.next();
while(element != this.interaction){
element = iter.next();
}
this.dcp.increaseScore(collection);
if (!collection.contains(this.interaction)) {
System.out.println(element + " WAS REMOVED by thread " + this.id);
iter.remove();
}
}
}
}
}
public class ConcurrentApp {
public void multiRP (DCP<BSN> dcp, int threads) {
ConcurrentHashMap<Integer,Boolean> x = new ConcurrentHashMap<Integer,Boolean>();
ConcurrentHashMap<Integer,Boolean> z = new ConcurrentHashMap<Integer,Boolean>();
Set<Integer> y = (Set<Integer>) Collections.newSetFromMap(x);
y.addAll(dcp.PA);
Set<Integer> zeta = (Set<Integer>) Collections.newSetFromMap(z);
ExecutorService executor = Executors.newFixedThreadPool(threads);
int i =1;
while ((y.size() > i) && (i <= dcp.R)){
for (Iterator<Integer> iterator = y.iterator(); iterator.hasNext();){
zeta.addAll(y);
Integer interaction = iterator.next();
zeta.remove(interaction);
ArrayList<Set<Integer>> subsets = dcp.getSubsets(zeta, i);
for (int j = 0; j< subsets.size(); j++){
executor.submit(new Processor(j, interaction, subsets.get(j), dcp, y));
}
}
i++;
}
executor.shutdown();
System.out.println("All tasks submitted");
try {
executor.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(y);
dcp.PA = new ArrayList<Integer>(y);
System.out.println("All tasks completed");
}
}
以及包含一些辅助函数的类 DCP 的相关代码在这里:
public class DCP<E> extends CondInd<E> {
public final int R; //assumed maximum # of nodes blocking any node from target
public ArrayList <Integer> PA; //set of all parents of the target node (should be static)
//public Node NULL; //NULL is model with no parents of target (should be final static)
public E scoreType = null;
public ScoringFunction<? super E> scoringFunction;
public double calcScore(E sT, Set<Integer> parentIndices) {
ArrayList<Integer> list = new ArrayList<Integer>(parentIndices);
return this.scoringFunction.score(sT, list);
}
public double calcScore(E sT, ArrayList<Integer> parentIndices) {
return this.scoringFunction.score(sT, parentIndices);
}
//helper for actual subsets method
private void getSubsets(ArrayList<Integer> input, int length, int start_index, Set<Integer> acc, ArrayList<Set<Integer>> sol){
//base case
if (acc.size() == length){
sol.add(new HashSet<>(acc));
return;
}
//recursive solution
if (start_index == input.size()) return;
int x = input.get(start_index);
acc.add(x);
//first recursion
getSubsets(input, length, start_index+1, acc, sol);
acc.remove(x);
//second recursion, after x removed
getSubsets(input, length, start_index+1, acc, sol);
}
//different arguments and returns a list of subsets
public ArrayList<Set<Integer>> getSubsets(ArrayList<Integer> input, int length){
ArrayList<Set<Integer>> sol = new ArrayList<>();
getSubsets(input, length, 0, new HashSet<Integer>(), sol);
return sol;
}
//different arguments and returns a list of subsets
public ArrayList<Set<Integer>> getSubsets(Set<Integer> input, int length){
ArrayList<Set<Integer>> sol = new ArrayList<>();
ArrayList<Integer> copy = new ArrayList<Integer>(input);
getSubsets(copy, length, 0, new HashSet<Integer>(), sol);
return sol;
}
//removes the element from the input that increases the score by the highest value
public void increaseScore(Set<Integer> input){
int index = -1;
double score = calcScore(scoreType,input);
List<Integer> list = new ArrayList<Integer>(input);
for (Integer element : list) {
ArrayList<Integer> copy_list = new ArrayList<Integer>(list);
copy_list.remove(element);
if (calcScore(scoreType,copy_list) > score){
index = list.indexOf(element);
score = calcScore(scoreType,copy_list);
}
}
if (index != -1)
input.remove(list.get(index));
}
public DCP(int maximumNodes, E scoreType, ScoringFunction<? super E> scoringFunction, ArrayList<Integer> parents){
this.R = maximumNodes;
this.scoreType = scoreType;
this.scoringFunction = scoringFunction;
this.PA = parents;
}
}
当我在线程 = 1 的 ConcurrentApp 中运行我的代码时,我会根据我的打印语句在控制台中收到以下打印输出:
All tasks submitted
0 WAS REMOVED by thread 1
1 WAS REMOVED by thread 0
2 WAS REMOVED by thread 0
3 WAS REMOVED by thread 0
4 WAS REMOVED by thread 1
5 WAS REMOVED by thread 0
6 WAS REMOVED by thread 0
[7, 8]
All tasks completed
Program completed in :22 seconds
其中第一个数字对应于从我的列表中删除的整数(例如,“0 WAS REMOVED by thread 2”表示值 0 已从主列表 y 中删除)。这个输出是有意义的,因为每个需要删除的值都被删除了一次,并给出了预期的结果 [7, 8],这应该是在这种情况下唯一没有被删除的两个值。
但是,当我使用 >1 个线程运行我的代码时,我得到以下输出:
All tasks submitted
0 WAS REMOVED by thread 2
1 WAS REMOVED by thread 1
2 WAS REMOVED by thread 0
2 WAS REMOVED by thread 1
3 WAS REMOVED by thread 1
3 WAS REMOVED by thread 2
4 WAS REMOVED by thread 0
4 WAS REMOVED by thread 1
5 WAS REMOVED by thread 0
5 WAS REMOVED by thread 1
6 WAS REMOVED by thread 0
6 WAS REMOVED by thread 1
7 WAS REMOVED by thread 1
7 WAS REMOVED by thread 2
8 WAS REMOVED by thread 1
8 WAS REMOVED by thread 0
[]
All tasks completed
Program completed in :0 seconds
如您所见,在某些情况下,相同的值被多次删除,因为多个线程决定需要删除该值。另一个问题是,这也通过给我一个 [] 而不是 [7, 8] 来改变我的结果,因为由于某种原因,当我使用多个线程时,程序错误地决定需要从主列表 y 中删除 7 和 8。我通过将 static 添加到锁定字段来解决了多线程删除的问题:
private static Object lock = new Object();
但是,现在我有一个问题,即当我增加线程数时运行时不会改变。添加 static 后 using threads >= 1 的输出如下:
All tasks submitted
0 WAS REMOVED by thread 1
1 WAS REMOVED by thread 1
2 WAS REMOVED by thread 1
3 WAS REMOVED by thread 0
4 WAS REMOVED by thread 1
5 WAS REMOVED by thread 0
6 WAS REMOVED by thread 0
[7, 8]
All tasks completed
Program completed in :22 seconds
线程数并没有提高运行时间,但我得到了正确的结果。无论我使用 1 个线程还是多个线程,这个结果和运行时都是完全相同的。
问题:在我看来,有两种可能的解决方案:
1) 移除锁上的static关键字,并找到一种方法让执行移除的线程告诉其他线程跳过被移除的值
2) 保留 static 关键字并找出为什么我的程序在有更多线程可用时只使用 1 个线程。
非常感谢任何想法!
【问题讨论】:
-
很难相信所有代码都与问题相关。你能试着把它减少到minimal reproducible example吗?
-
@shmosel 第一个代码块是解决问题所需的主要代码。我之前发布过类似的代码,并被告知还要包含第二个代码块以使事情变得更容易。请关注第一个代码块,但当您需要了解第一个代码块中的一些函数/类时,请参考第二个代码块。我相信这是一个完整的示例,但是,由于运行代码所需的各种包和依赖项,一个可验证的示例将需要更多的代码。
标签: java multithreading concurrency locking java.util.concurrent