【问题标题】:Java thread-safe passing of collection objects from one thread to anotherJava 线程安全地将集合对象从一个线程传递到另一个线程
【发布时间】:2013-01-26 13:16:50
【问题描述】:

我有一个 Java 应用程序,它有工作线程来处理作业。一个工人产生一个结果对象,比如:

class WorkerResult{
    private final Set<ResultItems> items;
    public Worker(Set<ResultItems> pItems){
         items = pItems;
    }
}

当工作人员完成时,它会执行以下操作:

 ...
 final Set<ResultItems> items = new SomeNonThreadSafeSetImplSet<ResultItems>();
 for(Item producedItem : ...){
      items.add(item);
 }
 passToGatherThread(items);

items 集在这里是一种“工作单元”。 passToGatherThread 方法将items 集合传递给一个收集线程,其中只有一个在运行时存在。

这里不需要同步,因为只有一个线程(Gather-thread)读取items 集,所以不会发生竞争条件。 AFAICS,Gather-thread 可能看不到所有项目,因为该集合不是线程安全的,对吧?

假设我不能使 passToGatherThread 同步,因为它是一个 3rd 方库。我基本上担心的是,由于缓存、VM 优化等原因,收集线程看不到所有项目。所以问题来了:如何以线程安全的方式传递项目集,以便收集线程“看到”合适的项目集?

【问题讨论】:

  • 我不确定,但也许PipedStreams 可以帮助你?
  • passToGatherthread的定义是什么?我认为这对于理解下面给出的答案是否正确至关重要。 items 究竟是如何传递给收集线程的?
  • 您是否真的对对象的可见性有问题,或者您只是怀疑这种行为?在这种情况下,您的担忧似乎有些牵强。
  • 我怀疑有这样的行为……到目前为止我还没有经历过,但这并不意味着不会有这样的行为。我想确定一下,这就是我问的原因。
  • 并发错误真的很难被发现。仅仅因为 xSNRG 到目前为止没有体验过它们并不意味着没有。在具有不同内核集的不同 VM 或机器上,情况可能完全不同。

标签: java thread-safety


【解决方案1】:

这里似乎没有同步问题。您为每个 passToGatherThread 创建一个新的 Set 对象,并在修改集合后执行此操作。不会丢失任何对象。

如果不对集合进行任何修改,许多线程可以同时访问 Set(和大多数 Java 集合)。这就是Collections.unmodifiableCollection 的用途。

由于上面提到的passToGatherThread方法作为与其他线程的通信,它必须使用某种同步——并且每次同步确保线程之间的内存一致性。

另外 - 请注意,所有对传递集合中对象的写入都是在 传递给另一个线程之前进行的。即使内存被复制到线程的本地缓存中,它也具有与其他线程相同的未修改值。

【讨论】:

  • AFAIK 允许 JVM 尽可能在处理器核心寄存器中缓存数据。当写入数据而不“刷新”出来(使用同步/易失性/等)时,其他线程可能会看到“陈旧”值甚至根本看不到,因为“之前发生”语义仅在产生数据的线程中有效。
  • @xSNRG 添加了一个段落。只要 pass 方法有效,就不用担心了。
  • 此外 - 如果这不起作用,那么很多应用程序将无法正常工作。
  • 我担心的在这里更准确地描述在 keyworkd“陈旧”下。 securecoding.cert.org/confluence/display/java/…
  • Set 将传递的值存储为私有成员变量,这些变量不是线程安全的,因为线程 A 存储它们而线程 B 读取它们,没有同步和任何进一步的可见性问题,避免了机制。
【解决方案2】:

我对这个问题思考(并讨论)了很多,我想出了另一个答案,我希望这将是最好的解决方案。

传递一个同步的集合在效率方面并不好,因为该集合上的每个后续操作都会被同步 - 如果有很多操作,它可能会被证明是一个障碍。

直截了当:让我们做一些假设(我不同意):

  • 提到的passToGatherThread 方法确实不安全,但看起来不太可能
  • 编译器可以对代码中的事件重新排序,以便在填充集合之前调用passToGatherThread

确保传递给gatherer方法的集合已准备好和完整的最简单、最干净和可能最有效的方法是将集合推送放在一个同步块中,如下所示:

synchronized(items) {
  passToGatherThread(items);
}

这样我们在传递集合之前确保内存同步和有效的happens-before序列,从而确保所有对象都正确传递。

【讨论】:

  • 我认为你是对的,最后这正是我遇到这个问题的方式。它看起来有点丑不是吗?在一个新创建的集合上同步,对大多数人来说这看起来很傻...... ;-) 谢谢你的回答。
【解决方案3】:

您可以简单地使用 Java 为您的 WorkerResult 提供的 Set 的线程安全实现之一。例如:

另一种选择是使用Collections.synchronizedSet()

【讨论】:

  • 为什么需要这个?使用非线程安全的Set 实现有什么问题?
  • 我认为 OP 害怕缓存。线程可能会缓存非volatile 数据,如果对其的写入未同步,则永远不会看到更新。
  • 是的,joergl 是绝对正确的。请参阅我在 Dariusz Wawer 答案下方的评论。
  • 鉴于 xSNRG 在这里编写的所有内容,unmodifiableSet 不会解决任何问题 - 它不提供任何同步。
  • @Dariusz Wawer:你是对的,谢谢! unmodifiableSet 不保证同步,因此也不保证可见性。对原始集的修改可能在不可变副本中不可见。我调整了答案。
【解决方案4】:

worker实现callable并返回WorkerResult:

class Worker implements Callable<WorkerResult> {
    private WorkerInput in;

    public Worker(WorkerInput in) {
        this.in = in;
    }

    public WorkerResult call() {
        // do work here
    }
}

然后我们使用一个ExecutorService来管理线程池,并通过使用Future来收集结果。

public class PooledWorkerController {

    private static final int MAX_THREAD_POOL = 3;
    private final ExecutorService pool = 
       Executors.newFixedThreadPool(MAX_THREAD_POOL);

    public Set<ResultItems> process(List<WorkerInput> inputs) 
           throws InterruptedException, ExecutionException{         
        List<Future<WorkerResult>> submitted = new ArrayList<>();
        for (WorkerInput in : inputs) {
            Future<WorkerResult> future = pool.submit(new Worker(in));
            submitted.add(future);
        }
        Set<ResultItems> results = new HashSet<>();
        for (Future<WorkerResult> future : submitted) {
            results.addAll(future.get().getItems());
        }
        return results;
    }
}

【讨论】:

  • 因此每次启动作业时都会创建一个线程。 -1 。
  • 在您的帖子中,您介绍了每次调用一组作业时创建线程的不良做法。线程创建非常昂贵,并且只要有可能,线程应该创建一次并重用。如果您有一个将在process 中使用的静态初始化线程池,那么您的代码将在 IMO 中变得更好。为每个process 调用创建一个新线程池并没有太大变化。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-09-16
  • 1970-01-01
  • 1970-01-01
  • 2021-08-04
  • 2015-06-12
相关资源
最近更新 更多