【问题标题】:AtomicReference to a mutable object and visibility对可变对象和可见性的 AtomicReference
【发布时间】:2012-03-11 19:24:15
【问题描述】:

假设我有一个AtomicReference到一个对象列表:

AtomicReference<List<?>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>());

线程 A 将元素添加到此列表中:batch.get().add(o);

稍后,线程 B 获取该列表,例如,将其存储在 DB 中:insertBatch(batch.get());

我是否必须在写入(线程 A)和读取(线程 B)时进行额外的同步,以确保线程 B 以 A 离开它的方式看到列表,还是由 AtomicReference 处理?

换句话说:如果我有一个可变对象的 AtomicReference,并且一个线程更改了该对象,其他线程是否会立即看到此更改?

编辑:

也许一些示例代码是有序的:

public void process(Reader in) throws IOException {
    List<Future<AtomicReference<List<Object>>>> tasks = new ArrayList<Future<AtomicReference<List<Object>>>>();
    ExecutorService exec = Executors.newFixedThreadPool(4);

    for (int i = 0; i < 4; ++i) {
        tasks.add(exec.submit(new Callable<AtomicReference<List<Object>>>() {
            @Override public AtomicReference<List<Object>> call() throws IOException {

                final AtomicReference<List<Object>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>(batchSize));

                Processor.this.parser.parse(in, new Parser.Handler() {
                    @Override public void onNewObject(Object event) {
                            batch.get().add(event);

                            if (batch.get().size() >= batchSize) {
                                dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));
                            }
                    }
                });

                return batch;
            }
        }));
    }

    List<Object> remainingBatches = new ArrayList<Object>();

    for (Future<AtomicReference<List<Object>>> task : tasks) {
        try {
            AtomicReference<List<Object>> remainingBatch = task.get();
            remainingBatches.addAll(remainingBatch.get());
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();

            if (cause instanceof IOException) {
                throw (IOException)cause;
            }

            throw (RuntimeException)cause;
        }
    }

    // these haven't been flushed yet by the worker threads
    if (!remainingBatches.isEmpty()) {
        dao.insertBatch(remainingBatches);
    }
}

这里发生的是我创建了四个工作线程来解析一些文本(这是process() 方法的Reader in 参数)。每个工作人员将其已解析的行保存在一个批处理中,并在批处理已满时刷新该批处理 (dao.insertBatch(batch.getAndSet(new ArrayList&lt;Object&gt;(batchSize)));)。

由于文本中的行数不是批次大小的倍数,因此最后一个对象最终会出现在未刷新的批次中,因为它未满。因此,这些剩余批次由主线程插入。

我使用AtomicReference.getAndSet() 将整个批次替换为空批次。这个程序在线程方面正确吗?

【问题讨论】:

    标签: java multithreading concurrency mutable atomicreference


    【解决方案1】:

    我认为,忘记这里的所有代码,你的确切问题是:

    在编写(线程 A)和 阅读(线程 B)以确保线程 B 以 A 离开它的方式看到列表, 还是由 AtomicReference 处理?

    因此,对此的确切响应是:YES,原子处理可见性。这不是我的意见,而是JDK documentation one

    原子访问和更新的内存效应通常遵循 volatile 的规则,如 Java 语言规范第三版(17.4 内存模型)中所述。

    我希望这会有所帮助。

    【讨论】:

      【解决方案2】:

      AtomicReference 只会帮助您参考列表,它不会对列表本身做任何事情。更具体地说,在您的场景中,当系统处于负载状态时,您几乎肯定会遇到问题,即消费者已获取列表,而生产者正在向其添加项目。

      这听起来像你应该使用BlockingQueue。然后,如果您的生产者比您的消费者快,您可以限制内存占用并让队列处理所有争用。

      类似:

      ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (50);
      
      // ... Producer
      queue.put(o);
      
      // ... Consumer
      List<Object> queueContents = new ArrayList<Object> ();
      // Grab everything waiting in the queue in one chunk. Should never be more than 50 items.
      queue.drainTo(queueContents);
      

      已添加

      感谢@Tudor 指出您正在使用的架构。 ...我不得不承认这很奇怪。据我所知,您根本不需要AtomicReference。每个线程都拥有自己的ArrayList,直到它被传递给dao,此时它被替换,因此在任何地方都没有争用。

      我有点担心您在单个 Reader 上创建四个解析器。我希望你有办法确保每个解析器不会影响其他解析器。

      我个人会使用上面代码中描述的某种形式的生产者-消费者模式。可能是这样的。

      static final int PROCESSES = 4;
      static final int batchSize = 10;
      
      public void process(Reader in) throws IOException, InterruptedException {
      
        final List<Future<Void>> tasks = new ArrayList<Future<Void>>();
        ExecutorService exec = Executors.newFixedThreadPool(PROCESSES);
        // Queue of objects.
        final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (batchSize * 2);
        // The final object to post.
        final Object FINISHED = new Object();
      
        // Start the producers.
        for (int i = 0; i < PROCESSES; i++) {
          tasks.add(exec.submit(new Callable<Void>() {
            @Override
            public Void call() throws IOException {
      
              Processor.this.parser.parse(in, new Parser.Handler() {
                @Override
                public void onNewObject(Object event) {
                  queue.add(event);
                }
              });
              // Post a finished down the queue.
              queue.add(FINISHED);
              return null;
            }
          }));
        }
      
        // Start the consumer.
        tasks.add(exec.submit(new Callable<Void>() {
          @Override
          public Void call() throws IOException {
            List<Object> batch = new ArrayList<Object>(batchSize);
            int finishedCount = 0;
            // Until all threads finished.
            while ( finishedCount < PROCESSES ) {
              Object o = queue.take();
              if ( o != FINISHED ) {
                // Batch them up.
                batch.add(o);
                if ( batch.size() >= batchSize ) {
                  dao.insertBatch(batch);
                  // If insertBatch takes a copy we could merely clear it.
                  batch = new ArrayList<Object>(batchSize);
                }
              } else {
                // Count the finishes.
                finishedCount += 1;
              }
            }
            // Finished! Post any incopmplete batch.
            if ( batch.size() > 0 ) {
              dao.insertBatch(batch);
            }
            return null;
          }
        }));
      
        // Wait for everything to finish.
        exec.shutdown();
        // Wait until all is done.
        boolean finished = false;
        do {
          try {
            // Wait up to 1 second for termination.
            finished = exec.awaitTermination(1, TimeUnit.SECONDS);
          } catch (InterruptedException ex) {
          }
        } while (!finished);
      }
      

      【讨论】:

      • 嗯...从他的代码来看,它看起来不像生产者-消费者。他实际上是在生成一组线程,每个线程都做一些工作,然后加入它们并在主线程中完成工作。线程之间没有实际的数据传递。
      【解决方案3】:

      嗯...它真的不是这样工作的。 AtomicReference 保证引用本身在线程中是可见的,即,如果您为其分配与原始引用不同的引用,则更新将可见。它不保证引用指向的对象的实际内容。

      因此,对列表内容的读/写操作需要单独同步。

      编辑:因此,从您更新的代码和您发布的评论来看,将本地引用设置为volatile 足以确保可见性。

      【讨论】:

      • 好的,我在上面的问题中添加了一些示例代码。我使用AtomicReference.getAndSet() 用新鲜的空批次替换完整批次。我还需要额外的同步吗?
      • 是的,您的代码是正确的,虽然这里似乎不需要使用AtomicReference
      • @Tudor 也在想同样的事情。实际上 getAndSet() 可能不会做他想做的,因为它会获取当前值,然后更改 AtomicReference 的值。
      • @Tudor:是的,我也是这么想的。但问题是批处理必须声明为final 才能在我的(匿名)处理程序的onNewObject() 方法中使用它。无论如何,我决定放弃匿名内部 Callable 类,转而使用实现 Callable 的更具可读性和可测试性的 ParseAndInsertTask 类,并将批处理作为任务的实例变量,而不是局部变量。
      • 请注意,如果您不需要进行原子比较和交换操作,您可以只使用volatile 变量而不是AtomicReference
      【解决方案4】:

      添加到Tudor 的答案:您必须使ArrayList 本身是线程安全的,或者 - 根据您的要求 - 甚至更大的代码块。

      如果您可以摆脱线程安全ArrayList,您可以像这样“装饰”它:

      batch = java.util.Collections.synchronizedList(new ArrayList<Object>());
      

      但请记住:即使是这样的“简单”构造也不是线程安全的:

      Object o = batch.get(batch.size()-1);
      

      【讨论】:

        猜你喜欢
        • 2016-03-29
        • 2021-02-27
        • 2016-12-23
        • 2011-10-12
        • 2011-02-23
        • 1970-01-01
        • 2010-12-26
        • 2011-08-01
        • 2010-11-21
        相关资源
        最近更新 更多