【问题标题】:Java - HashMap values - volatileJava - HashMap 值 - 易失性
【发布时间】:2016-07-28 16:55:02
【问题描述】:

我的情况是,我有 N 个线程做相同的工作,而一个线程 X 做不同的事情。

每个线程 N 读取/写入一个静态对象,该对象属于一个类(称为 MMLCounter),它是 HashMap 的包装器,并且每个线程使用该 HashMap 的不同键/值对,因此所有线程都读取/写入值到HashMap 同时进行。 线程 X 需要定期访问所有值,并且在访问它们的同时(从访问第一个值到访问最后一个值,其他 N 个线程都不能更改 HashMap 中的值)。

HashMap在程序执行开始时在线程创建期间被初始化并添加键/值,之后不再添加新的键/值,只有HashMap中的值发生变化。

正因为如此,我没有使用 ConcurrentHashMap 或同步函数,而是创建了一个围绕 HashMap 的包装类,它还有一个标志,指示 N 个线程是否允许它们更改值,并且此标志是专门更改的通过线程 X。

这样所有 N 个线程都可以并行使用 HashMap,但是当线程 X 开始工作时,它只能使用 HashMap 直到它完成。

我的问题是我是否需要将任何内容声明为 volatile(例如 HashMap 中的值),如果需要,是什么以及如何声明?

我想避免的事情(不知道它是否可能发生)是 N 个线程中的一个更改了 HashMap 中的一个值,但该值的更改仅反映在该本地缓存中线程,当线程 X 从 HashMap 读取该值时,它将从其本地缓存内存中读取它,该内存与另一个 N 线程的本地缓存内存不同步,这意味着它将具有旧值。

代码如下:

public static void main(String[] args) throws ProtocolException { 

    int NUMBER_OF_THREADS = 400; 

    List<Future<?>> futureList = new ArrayList<Future<?>>(); 
    ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS+1);
    futureList.add(executor.submit(new Runnable() {
        @Override
        public void run() { 
                int measureInterval = 10000;
                try {
                    Thread.sleep(measureInterval);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } 
                System.out.println("--> MML rate is : " + MMLGenerator.MML_COUNTER.getMMLRate(measureInterval/1000) + " MML per second.");
        }
    }));

    //create and start sending threads. 
    for (int threadNmbr = 0; threadNmbr < NUMBER_OF_THREADS; threadNmbr++) {
        futureList.add(executor.submit(new Thread(new MMLGenerator(threadNmbr))));
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    } 

    //wait for threads to finish. 
    for (Future<?> future : futureList) { 
        try { 
            future.get(); 
        } catch (InterruptedException e) {
        } 
        catch (ExecutionException e) { 
            throw (RuntimeException) e.getCause(); 
        } 
    } 
    executor.shutdown();
} 

class MMLGenerator implements Runnable {

    public static volatile MMLCounter MML_COUNTER = new MMLCounter();
    private int threadNmbr = 0;

    public MMLGenerator(int threadNmbr) {
        this.threadNmbr = threadNmbr;
        MMLGenerator.MML_COUNTER.put(this.threadNmbr, 0);

    }

    @Override
    public void run() {
        while(RUN_ACTIVE) {
            MML_COUNTER.increaseCounter(this.threadNmbr);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

public class MMLCounter {

  private Map<Integer,Integer> MMLCounter = new HashMap<Integer, Integer>();
  private boolean MMLCounterLocked = false;

  public Integer get(Integer key) {
      return this.MMLCounter.get(key);
  }

  public Integer put(Integer key, Integer value) {
      while (this.MMLCounterLocked) {
          try {
              Thread.sleep(100);
          } catch (InterruptedException e) {
              // TODO Auto-generated catch block
              e.printStackTrace();
          }
      }
      return this.MMLCounter.put(key, value);
  }

  public void increaseCounter(Integer key) {
      while (this.MMLCounterLocked) {
          try {
              Thread.sleep(100);
          } catch (InterruptedException e) {
              // TODO Auto-generated catch block
              e.printStackTrace();
          }
      }
      this.MMLCounter.put(key,this.MMLCounter.get(key).intValue() + 1);
  }

  public int getMMLRate(int measurementTime) {
      this.MMLCounterLocked = true;
      int MMLCounterSum = 0;
      for (Integer counterID : this.MMLCounter.keySet()) {
          int counter = this.MMLCounter.get(counterID);
          MMLCounterSum += counter;
          this.MMLCounter.put(counterID, 0);
      }
      this.MMLCounterLocked = false;
      return MMLCounterSum/measurementTime;
  }

}


修改后

感谢大家的帮助。 我刚刚阅读了ReentrantReaderWriterLock 的描述,这确实是我需要的。下面是修改后的代码。

但是,我还有两个问题:

1) 如果我使用ReentrantReaderWriterLock 保护代码的关键部分,为什么还需要使用ConcurrentHashMap 而不是HashMap

2) ReentrantReaderWriterLock 的这种用法只会从我之前的实现中替换我现在看到的标志的用法没有正确完成。但是,我仍然有HashMap 中的值对象不是易失性的问题,因此不同的线程将各自拥有自己的本地缓存值副本,该副本与来自其他线程的值的本地缓存副本不同步?

public static void main(String[] args) throws ProtocolException { 

    int NUMBER_OF_THREADS = 400; 

    List<Future<?>> futureList = new ArrayList<Future<?>>(); 
    ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS+1);
    futureList.add(executor.submit(new Runnable() {
        @Override
        public void run() { 
                int measureInterval = 10000;
                try {
                    Thread.sleep(measureInterval);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } 
                System.out.println("--> MML rate is : " + MMLGenerator.counter.getMMLRate(measureInterval/1000) + " MML per second.");
        }
    }));

    //create and start sending threads. 
    for (int threadNmbr = 0; threadNmbr < NUMBER_OF_THREADS; threadNmbr++) {
        futureList.add(executor.submit(new Thread(new MMLGenerator(threadNmbr))));
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    } 

    //wait for threads to finish. 
    for (Future<?> future : futureList) { 
        try { 
            future.get(); 
        } catch (InterruptedException e) {
        } 
        catch (ExecutionException e) { 
            throw (RuntimeException) e.getCause(); 
        } 
    } 
    executor.shutdown();
} 

class MMLGenerator implements Runnable {

    public static MMLCounter counter = new MMLCounter();
    private int threadNmbr = 0;

    public MMLGenerator(int threadNmbr) {
        this.threadNmbr = threadNmbr;
        MMLCounter.counter.put(this.threadNmbr, 0);

    }

    @Override
    public void run() {
        while(RUN_ACTIVE) {
            MMLCounter.counter.increaseCounter(this.threadNmbr);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

public class MMLCounter {

  private Map<Integer,Integer> counter = new HashMap<Integer, Integer>();
  public static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);

  public Integer put(Integer key, Integer value) {
      lock.readLock().lock();
      Integer oldValue = this.counter.put(key, value);
      lock.readLock().unlock();
      return oldValue;
  }

  public void increaseCounter(Integer key) {
      lock.readLock().lock();
      this.counter.put(key,this.counter.get(key).intValue() + 1);
      lock.readLock().unlock();
  }

  public int getMMLRate(int measurementTime) {
      lock.writeLock().lock();
      int counterSum = 0;
      for (Integer counterID : this.counter.keySet()) {
          counterSum += this.counter.get(counterID);;
          this.counter.put(counterID, 0);
      }
      lock.writeLock().unlock();
      return counterSum/measurementTime;
  }
}

第二次修改后

我现在发现我需要实现的逻辑需要我从多个线程操作多个计数器,而不仅仅是一个,并且每个线程在任何时候都可以更改任何计数器。 下面是我的实现,但我不确定我在性能和数据一致性方面是否做得很好。

我除了有随机数量的计数器(计数器的数量将在执行开始时知道)将由字符串值标识,并且每个计数器必须计算两个值(第一个值总是会增加,第二个值有时只会增加,但如果它们增加,它们需要同时增加)。当我需要每个计数器的总和时,我需要在原子操作中获取两个计数器值,并且从我获取第一个计数器到我获取最后一个计数器的时间,其他线程都不能更改任何计数器。

出于演示目的,作为 Counter 标识(HashMap 中的键),我取了 Counter 的序号的 String 值,并确定在每个线程的每次迭代中需要增加哪个 Counter,以及确定是否只有一个或两个值计数器需要增加,我用的是随机生成器。

public static void main(String[] args) { 

    int NUMBER_OF_THREADS = 400;


    MMLGenerator.counterNmbr(2);
    List<Future<?>> futureList = new ArrayList<Future<?>>(); 
    ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS+1); 

    futureList.add(executor.submit(new Runnable() {
        @Override
        public void run() { 
            while(true)
            {
                int measureInterval = 10;
                try {
                    TimeUnit.SECONDS.sleep(measureInterval);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                MMLGenerator.lock.writeLock().lock();
                for (String counterId : MMLGenerator.counter.keySet()) {
                    MMLCounterSimple counter = MMLGenerator.counter.get(counterId).getCountAndReset();
                    System.out.println("--> For counter " + counterId + " total is : " + counter.getTotal() + ", and failed is : " + counter.getFailed());
                }
                MMLGenerator.lock.writeLock().unlock();
            }
        }
    }));

    //create and start sending threads. 
    for (int threadNmbr = 0; threadNmbr < NUMBER_OF_THREADS; threadNmbr++) {
        futureList.add(executor.submit(new Thread(new MMLGenerator())));
        try {
            TimeUnit.MILLISECONDS.sleep(50);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    } 

    //wait for threads to finish. 
    for (Future<?> future : futureList) { 
        try { 
            future.get(); 
        } catch (InterruptedException e) {
        } 
        catch (ExecutionException e) { 
            throw (RuntimeException) e.getCause(); 
        } 
    } 
    executor.shutdown();
} 



class MMLGenerator implements Runnable {

    public static volatile HashMap<String, MMLCounter> counter = new HashMap<String, MMLCounter>();
    public static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);


    public static void counterNmbr(int counterNmbr) {
        lock.writeLock().lock();
        for(int i = 0; i < counterNmbr; i++) {
            counter.put(new Integer(i).toString(), new MMLCounter());
        }
        lock.writeLock().unlock();
    }

    @Override
    public void run() {
        while(RUN_PROVISIONING) {
            lock.readLock().lock();
            String counterID = new Integer(new Random().nextInt(counter.size())).toString();
            long failedInc = 0;
            if (new Random().nextInt(2) == 0) {
                failedInc = 1;
            }
            counter.get(counterID).increaseCounter(failedInc);
            lock.readLock().unlock();
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

}


public class MMLCounter {

    private volatile long total = 0;
    private volatile long failed = 0;
    public static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);


    public synchronized void increaseCounter(long failedInc) {
        lock.writeLock().lock();
        total++;;
        failed = failed + failedInc;
        lock.writeLock().unlock();
    }

    public synchronized MMLCounterSimple getCountAndReset() {
        lock.writeLock().lock();
        MMLCounterSimple simpleCounter = new MMLCounterSimple(total, failed);
        total = 0;
        failed = 0;
        lock.writeLock().unlock();
        return simpleCounter;
    }

}


public class MMLCounterSimple {

    private long total = 0;
    private long failed = 0;

    public MMLCounterSimple(long total, long failed) {
        this.total = total;
        this.failed = failed;
    }

    public long getTotal() {
        return this.total;
    }

    public long getFailed() {
        return this.failed;
    }
}

【问题讨论】:

  • map值的类型是什么?它可能会对答案产生影响。另外,请删除与问题无关的所有代码。例如,只显示更新地图等的几行。我们不需要或不想看到整个班级(其中有很多与问题无关的内容)
  • 地图中的值为整数。
  • 顺便说一句。我删除了不相关的代码。
  • 我猜你做过一些.net/c#?在 java 中,约定是为方法、字段、变量和参数使用前导小写名称。只有类名以大写字母开头。在阅读您的代码时,我们 Java 人员非常困惑,例如,我们假设 MMLCounter 是一个类,而它实际上是一个字段。最好将其命名为 mmlCounter,或仅命名为 counter(因为您的班级中没有其他计数器)
  • 您需要跟踪每个单独线程的计数吗?或者只是所有线程的总和?也就是说,你是用MMLCounterget()方法,还是只用getMMLRate()

标签: java multithreading hashmap


【解决方案1】:

正如所写,不能保证按预期工作。 N 个线程执行的写入和 X 线程执行的读取之间没有同步点。甚至MMLCounterLocked 标志也可能被作者忽略。

除了使您的代码正常工作之外,使用像 ConcurrentMap 这样的更高级别的并发工具将大大简化您的代码。


由于您只需要总和,LongAccumulator 就足够了,它使代码非常简单和安全。

  public static void main(String[] args)
    throws Exception
  {
    int NUMBER_OF_THREADS = 400;
    ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
    LongAccumulator sum = new LongAccumulator(Long::sum, 0);
    for (int i = 0; i < NUMBER_OF_THREADS; ++i) {
      executor.submit(new MMLGenerator(sum));
      TimeUnit.MILLISECONDS.sleep(50); /* Why??? */
    }
    int interval = 10;
    TimeUnit.SECONDS.sleep(interval);
    long rate = sum.getThenReset() / interval;
    System.out.println("--> MML rate is : " + rate + " MML per second.");
    executor.shutdownNow();
  }

  private static final class MMLGenerator
    implements Runnable
  {

    private final LongAccumulator counter;

    MMLGenerator(LongAccumulator counter)
    {
      this.counter = counter;
    }

    @Override
    public void run()
    {
      while (true) {
        counter.accumulate(1);
        try {
          TimeUnit.SECONDS.sleep(1);
        }
        catch (InterruptedException shutdown) {
          break;
        }
      }
    }

  }

关于你的两个新问题:

  1. “如果我使用ReentrantReaderWriterLock 保护代码的关键部分,为什么还需要使用ConcurrentHashMap 而不是HashMap?”

没有使用读写锁保护关键部分。当您 write 表时,您正在获取 read 锁,当您 read 时,您正在获取 write 锁。 HashMap 的行为未在并发修改下定义。您可能会导致正在使用该表的线程挂起。

此外,您应该使用try-finally 构造来确保无论发生任何错误都可以解锁。

如果您使用ConcurrentMap,线程可以在不获取整个表的锁的情况下进行更新,这就是您正确应用读写锁时要做的事情。

  1. ReentrantReaderWriterLock 的这种用法只会从我之前的实现中替代我现在看到的标志的用法没有正确完成。但是,我仍然有HashMap 中的值对象不是volatile 的问题,所以[将]不同的线程......每个都有自己的本地缓存值副本,该副本与其他线程的值的本地缓存副本不同步?”

不,Lock 获取确实与其他线程同步,并且在获取锁之前发生的其他线程的更改将是可见的。如果您修复了锁定,HashMap 将正常工作,但它会通过在更新期间锁定整个表来实现。

【讨论】:

  • 嗨埃里克森,关于你的问题,我只需要所有线程的总和,但我使用 HashMap 和每个线程的键/值对,所以每个线程可以并行写入计数,然后只有当我需要我计算的总和,在此期间其他线程将无法修改其计数器。如果有更好/更快的方法,我愿意接受。我输入了使用ReentrantReaderWriterLock 的问题修改代码以及另外两个问题。你认为我应该和为什么还要使用ConcurrentHashMap 就足够了吗?
  • @Xyster 我用一个例子更新了我的答案。 ReentrantReaderWriterLock 比必要的复杂,尤其是如果您只想要总和。
  • 嗨,埃里克森,我读了你的例子,乍一看它看起来很棒。但这对我不起作用,因为我需要这个程序与 Java 1.7 兼容,而且我看到 LongAccumulator 是从 Java 1.8 开始的。那么如何实现我所需要的并在 Java 1.7 中工作?
  • @Xyster 您可以改用AtomicLong(或AtomicInteger)。在高竞争下它的性能可能不如LongAccumulator,但它仍然比锁定HashMap 更新要好。请注意,Java 7 在一年多前就已报废,因此获取安全补丁等可能会很昂贵。
  • 当 N 个线程中的一个线程(仅仅增加一个特定键/值对的计数器足够)时读取锁不够,因为 N 个线程中的每个线程都将始终使用一个相同的线程哈希图中的键值对,因此没有多个 N 线程使用相同的键值对,这意味着从单个键值对的角度来看没有线程并发?并且只有读取所有值并在读取后重置它们的线程 X 与 N 个线程中的每一个并发,这意味着我需要一个写锁。关于try-finally,我同意并会添加它。
【解决方案2】:

是的,你需要volatile,没有它就不能保证其他线程会看到变化; java 可能(并且通常会)在另一个线程中本地缓存该值,因此在本地更改它不会对等待它的其他线程产生任何影响。

例如,这一行:

this.MMLCounterLocked = true;

如果字段不为volatile,则无用。


您为什么要尝试实现自己的锁?只需通过在静态字段或静态方法上同步来使用 java 的锁定。

【讨论】:

  • 嗨 Bohemian,首先,很抱歉变量名不符合 Java 约定。我使用不同的语言(Java、C、C++、C#、Python、Perl、Bash、TTCN 等)进行编程,所以有时我需要一些热身来记住每种语言的约定和规则。无论如何,我将使用ReentrantReaderWriterLock 的问题修改代码以及另外两个问题放入其中。您怎么看?您能否回答其他问题以获得更多说明?
【解决方案3】:

您可能希望使用允许多个读取器和一个写入器独占访问的锁定方案,例如java.util.concurrent.locks.ReentrantReadWriteLock,而不是尝试烘焙自己的同步机制。

The docs say

ReadWriteLock 维护一对关联的锁,一个用于 只读操作和一个用于写入的操作。读锁可能被持有 同时由多个阅读器线程,只要没有 作家。写锁是独占的。

【讨论】:

  • 嗨,Rob,这正是我所需要的;直到现在才知道 ReadWriteLock 机制。谢谢你。我输入了使用ReentrantReaderWriterLock 的问题修改代码以及另外两个问题。你怎么看,你能评论我发布的另外两个问题吗?
【解决方案4】:

结合罗布和埃里克森所说的话:

使用ConcurrentHashMap 这是线程安全的。

使用ReentrantReaderWriterLock,让所有正在编辑地图的线程获取ReadLock,这样他们就可以同时进行编辑。锁的读取端是共享锁。那么需要整个地图的一致视图的线程应该使用WriteLock来获得对地图的独占访问权。

您需要ConcurrentHashMap 的原因是所有线程都可以看到对地图的所有编辑,包括新条目。在这里使用ReentrantReaderWriterLock 的读取端是不够的。

您应该使用 ReentrantReaderWriterLock 的原因是,当线程 X 需要对映射的独占访问时,它可以通过使用锁的写入方来获得它。

这里最大的问题是在increaseCounter 中存在一个不能保证是原子的依赖项。你得到这个值然后你增加它。当两个线程调用此方法并且都获得相同的值然后增加它时会发生什么。如果这是一个银行账户,并且有人因为比赛条件损失了一分钱,那么......你明白了。这里有一个不变量没有得到支持,需要一些更细粒度的原子性。考虑使用AtomicInteger 作为值方,或者查看Atomic 包中的所有优秀类,这些类提供具有原子操作的单个变量。

【讨论】:

  • 嗨冒名顶替者,我输入了使用ReentrantReaderWriterLock 的问题修改代码以及另外两个问题。没有使用 ConcurrentHashMap 因为我不确定我是否真的需要它。您认为我是否应该以及为什么还要使用ConcurrentHashMap 以及为什么关于我发布的另外两个问题?
  • 我更新了我的答案。如果有帮助,请随时投票!
  • IncreaseCounter 保证是原子的,因为当每个 N 线程调用此函数时,它会为不同的键调用它。因此每个 N 线程将使用不同的键值对,这意味着两个 N 线程永远不会可以读取然后增加相同的值。您还说使用读取锁不足以确保在一个线程中进行的 HashMap 中的值更改在另一个线程中可见,@erickson 在他的回答中写了相反的内容,引用他的话:"不,锁获取确实与其他线程同步,其他线程在获取锁之前发生的更改将是可见的"
  • IncreaseCounter 不是原子的。它不是一步完成的。它需要几次操作才能发生。当任何未来的维护者对您的代码进行细微更改时,允许多个线程访问键值对。然后它坏了。关于同步的巧妙推理往往会导致失败。锁的读取端不足以确保可见性的原因是,如果您在持有读取锁的同时对映射进行更改,其他线程可能会同时更改它,因为它是共享的。 RW锁的原因是为了给线程X独占访问权限
猜你喜欢
  • 2011-07-07
  • 1970-01-01
  • 1970-01-01
  • 2016-05-31
  • 2012-02-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-01-26
相关资源
最近更新 更多