【发布时间】:2016-07-28 16:55:02
【问题描述】:
我的情况是,我有 N 个线程做相同的工作,而一个线程 X 做不同的事情。
每个线程 N 读取/写入一个静态对象,该对象属于一个类(称为 MMLCounter),它是 HashMap 的包装器,并且每个线程使用该 HashMap 的不同键/值对,因此所有线程都读取/写入值到HashMap 同时进行。 线程 X 需要定期访问所有值,并且在访问它们的同时(从访问第一个值到访问最后一个值,其他 N 个线程都不能更改 HashMap 中的值)。
HashMap在程序执行开始时在线程创建期间被初始化并添加键/值,之后不再添加新的键/值,只有HashMap中的值发生变化。
正因为如此,我没有使用 ConcurrentHashMap 或同步函数,而是创建了一个围绕 HashMap 的包装类,它还有一个标志,指示 N 个线程是否允许它们更改值,并且此标志是专门更改的通过线程 X。
这样所有 N 个线程都可以并行使用 HashMap,但是当线程 X 开始工作时,它只能使用 HashMap 直到它完成。
我的问题是我是否需要将任何内容声明为 volatile(例如 HashMap 中的值),如果需要,是什么以及如何声明?
我想避免的事情(不知道它是否可能发生)是 N 个线程中的一个更改了 HashMap 中的一个值,但该值的更改仅反映在该本地缓存中线程,当线程 X 从 HashMap 读取该值时,它将从其本地缓存内存中读取它,该内存与另一个 N 线程的本地缓存内存不同步,这意味着它将具有旧值。
代码如下:
public static void main(String[] args) throws ProtocolException {
int NUMBER_OF_THREADS = 400;
List<Future<?>> futureList = new ArrayList<Future<?>>();
ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS+1);
futureList.add(executor.submit(new Runnable() {
@Override
public void run() {
int measureInterval = 10000;
try {
Thread.sleep(measureInterval);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("--> MML rate is : " + MMLGenerator.MML_COUNTER.getMMLRate(measureInterval/1000) + " MML per second.");
}
}));
//create and start sending threads.
for (int threadNmbr = 0; threadNmbr < NUMBER_OF_THREADS; threadNmbr++) {
futureList.add(executor.submit(new Thread(new MMLGenerator(threadNmbr))));
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//wait for threads to finish.
for (Future<?> future : futureList) {
try {
future.get();
} catch (InterruptedException e) {
}
catch (ExecutionException e) {
throw (RuntimeException) e.getCause();
}
}
executor.shutdown();
}
class MMLGenerator implements Runnable {
public static volatile MMLCounter MML_COUNTER = new MMLCounter();
private int threadNmbr = 0;
public MMLGenerator(int threadNmbr) {
this.threadNmbr = threadNmbr;
MMLGenerator.MML_COUNTER.put(this.threadNmbr, 0);
}
@Override
public void run() {
while(RUN_ACTIVE) {
MML_COUNTER.increaseCounter(this.threadNmbr);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public class MMLCounter {
private Map<Integer,Integer> MMLCounter = new HashMap<Integer, Integer>();
private boolean MMLCounterLocked = false;
public Integer get(Integer key) {
return this.MMLCounter.get(key);
}
public Integer put(Integer key, Integer value) {
while (this.MMLCounterLocked) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return this.MMLCounter.put(key, value);
}
public void increaseCounter(Integer key) {
while (this.MMLCounterLocked) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
this.MMLCounter.put(key,this.MMLCounter.get(key).intValue() + 1);
}
public int getMMLRate(int measurementTime) {
this.MMLCounterLocked = true;
int MMLCounterSum = 0;
for (Integer counterID : this.MMLCounter.keySet()) {
int counter = this.MMLCounter.get(counterID);
MMLCounterSum += counter;
this.MMLCounter.put(counterID, 0);
}
this.MMLCounterLocked = false;
return MMLCounterSum/measurementTime;
}
}
修改后
感谢大家的帮助。
我刚刚阅读了ReentrantReaderWriterLock 的描述,这确实是我需要的。下面是修改后的代码。
但是,我还有两个问题:
1) 如果我使用ReentrantReaderWriterLock 保护代码的关键部分,为什么还需要使用ConcurrentHashMap 而不是HashMap?
2) ReentrantReaderWriterLock 的这种用法只会从我之前的实现中替换我现在看到的标志的用法没有正确完成。但是,我仍然有HashMap 中的值对象不是易失性的问题,因此不同的线程将各自拥有自己的本地缓存值副本,该副本与来自其他线程的值的本地缓存副本不同步?
public static void main(String[] args) throws ProtocolException {
int NUMBER_OF_THREADS = 400;
List<Future<?>> futureList = new ArrayList<Future<?>>();
ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS+1);
futureList.add(executor.submit(new Runnable() {
@Override
public void run() {
int measureInterval = 10000;
try {
Thread.sleep(measureInterval);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("--> MML rate is : " + MMLGenerator.counter.getMMLRate(measureInterval/1000) + " MML per second.");
}
}));
//create and start sending threads.
for (int threadNmbr = 0; threadNmbr < NUMBER_OF_THREADS; threadNmbr++) {
futureList.add(executor.submit(new Thread(new MMLGenerator(threadNmbr))));
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//wait for threads to finish.
for (Future<?> future : futureList) {
try {
future.get();
} catch (InterruptedException e) {
}
catch (ExecutionException e) {
throw (RuntimeException) e.getCause();
}
}
executor.shutdown();
}
class MMLGenerator implements Runnable {
public static MMLCounter counter = new MMLCounter();
private int threadNmbr = 0;
public MMLGenerator(int threadNmbr) {
this.threadNmbr = threadNmbr;
MMLCounter.counter.put(this.threadNmbr, 0);
}
@Override
public void run() {
while(RUN_ACTIVE) {
MMLCounter.counter.increaseCounter(this.threadNmbr);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public class MMLCounter {
private Map<Integer,Integer> counter = new HashMap<Integer, Integer>();
public static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
public Integer put(Integer key, Integer value) {
lock.readLock().lock();
Integer oldValue = this.counter.put(key, value);
lock.readLock().unlock();
return oldValue;
}
public void increaseCounter(Integer key) {
lock.readLock().lock();
this.counter.put(key,this.counter.get(key).intValue() + 1);
lock.readLock().unlock();
}
public int getMMLRate(int measurementTime) {
lock.writeLock().lock();
int counterSum = 0;
for (Integer counterID : this.counter.keySet()) {
counterSum += this.counter.get(counterID);;
this.counter.put(counterID, 0);
}
lock.writeLock().unlock();
return counterSum/measurementTime;
}
}
第二次修改后
我现在发现我需要实现的逻辑需要我从多个线程操作多个计数器,而不仅仅是一个,并且每个线程在任何时候都可以更改任何计数器。 下面是我的实现,但我不确定我在性能和数据一致性方面是否做得很好。
我除了有随机数量的计数器(计数器的数量将在执行开始时知道)将由字符串值标识,并且每个计数器必须计算两个值(第一个值总是会增加,第二个值有时只会增加,但如果它们增加,它们需要同时增加)。当我需要每个计数器的总和时,我需要在原子操作中获取两个计数器值,并且从我获取第一个计数器到我获取最后一个计数器的时间,其他线程都不能更改任何计数器。
出于演示目的,作为 Counter 标识(HashMap 中的键),我取了 Counter 的序号的 String 值,并确定在每个线程的每次迭代中需要增加哪个 Counter,以及确定是否只有一个或两个值计数器需要增加,我用的是随机生成器。
public static void main(String[] args) {
int NUMBER_OF_THREADS = 400;
MMLGenerator.counterNmbr(2);
List<Future<?>> futureList = new ArrayList<Future<?>>();
ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS+1);
futureList.add(executor.submit(new Runnable() {
@Override
public void run() {
while(true)
{
int measureInterval = 10;
try {
TimeUnit.SECONDS.sleep(measureInterval);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
MMLGenerator.lock.writeLock().lock();
for (String counterId : MMLGenerator.counter.keySet()) {
MMLCounterSimple counter = MMLGenerator.counter.get(counterId).getCountAndReset();
System.out.println("--> For counter " + counterId + " total is : " + counter.getTotal() + ", and failed is : " + counter.getFailed());
}
MMLGenerator.lock.writeLock().unlock();
}
}
}));
//create and start sending threads.
for (int threadNmbr = 0; threadNmbr < NUMBER_OF_THREADS; threadNmbr++) {
futureList.add(executor.submit(new Thread(new MMLGenerator())));
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//wait for threads to finish.
for (Future<?> future : futureList) {
try {
future.get();
} catch (InterruptedException e) {
}
catch (ExecutionException e) {
throw (RuntimeException) e.getCause();
}
}
executor.shutdown();
}
class MMLGenerator implements Runnable {
public static volatile HashMap<String, MMLCounter> counter = new HashMap<String, MMLCounter>();
public static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
public static void counterNmbr(int counterNmbr) {
lock.writeLock().lock();
for(int i = 0; i < counterNmbr; i++) {
counter.put(new Integer(i).toString(), new MMLCounter());
}
lock.writeLock().unlock();
}
@Override
public void run() {
while(RUN_PROVISIONING) {
lock.readLock().lock();
String counterID = new Integer(new Random().nextInt(counter.size())).toString();
long failedInc = 0;
if (new Random().nextInt(2) == 0) {
failedInc = 1;
}
counter.get(counterID).increaseCounter(failedInc);
lock.readLock().unlock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public class MMLCounter {
private volatile long total = 0;
private volatile long failed = 0;
public static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
public synchronized void increaseCounter(long failedInc) {
lock.writeLock().lock();
total++;;
failed = failed + failedInc;
lock.writeLock().unlock();
}
public synchronized MMLCounterSimple getCountAndReset() {
lock.writeLock().lock();
MMLCounterSimple simpleCounter = new MMLCounterSimple(total, failed);
total = 0;
failed = 0;
lock.writeLock().unlock();
return simpleCounter;
}
}
public class MMLCounterSimple {
private long total = 0;
private long failed = 0;
public MMLCounterSimple(long total, long failed) {
this.total = total;
this.failed = failed;
}
public long getTotal() {
return this.total;
}
public long getFailed() {
return this.failed;
}
}
【问题讨论】:
-
map值的类型是什么?它可能会对答案产生影响。另外,请删除与问题无关的所有代码。例如,只显示更新地图等的几行。我们不需要或不想看到整个班级(其中有很多与问题无关的内容)
-
地图中的值为整数。
-
顺便说一句。我删除了不相关的代码。
-
我猜你做过一些.net/c#?在 java 中,约定是为方法、字段、变量和参数使用前导小写名称。只有类名以大写字母开头。在阅读您的代码时,我们 Java 人员非常困惑,例如,我们假设
MMLCounter是一个类,而它实际上是一个字段。最好将其命名为mmlCounter,或仅命名为counter(因为您的班级中没有其他计数器) -
您需要跟踪每个单独线程的计数吗?或者只是所有线程的总和?也就是说,你是用
MMLCounter的get()方法,还是只用getMMLRate()?
标签: java multithreading hashmap