【问题标题】:How to populate entries into a map from a different thread and then from a single background thread iterate the map and send?如何从不同的线程将条目填充到地图中,然后从单个后台线程迭代地图并发送?
【发布时间】:2017-08-05 16:06:27
【问题描述】:

我有一个下面的类,其中我有一个add 方法,该方法由另一个线程调用以填充我的clientidToTimestampHolder 多图。然后在下面的同一个类中,我启动了一个每 60 秒运行一次的后台线程,并调用 processData() 方法,该方法迭代同一个地图并将所有这些数据发送到其他一些服务。

public class Handler {
  private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
  private final Multimap<String, Long> clientidToTimestampHolder = ArrayListMultimap.create();

  private static class Holder {
    private static final Handler INSTANCE = new Handler();
  }

  public static Handler getInstance() {
    return Holder.INSTANCE;
  }

  private Handler() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        processData();
      }
    }, 0, 60, TimeUnit.SECONDS);
  }

  // called by another thread to populate clientidToTimestampHolder map
  public void add(final String clientid, final Long timestamp) {
    clientidToTimestampHolder.put(clientid, timestamp);
  }

  // called by background thread
  public void processData() {
    for (Entry<String, Collection<Long>> entry : clientidToTimestampHolder.asMap().entrySet()) {
      String clientid = entry.getKey();
      Collection<Long> timestamps = entry.getValue();
      for (long timestamp : timestamps) {
        boolean isUpdated = isUpdatedClient(clientid, timestamp);
        if (!isUpdated) {
          updateClient(String.valueOf(clientid));
        }
      }
    }
  }
}

我的问题是,add 方法每次都会被不同的线程调用。那么我是否需要创建clientidToTimestampHolder 映射的副本并将该副本作为参数传递给processData() 方法,而不是直接处理该映射?

因为现在我使用同一个映射来填充其中的数据,然后还迭代同一个映射以将内容发送到其他服务,所以我不会从该映射中删除数据,因此这些条目将始终存在于该映射中.

解决此问题的最佳方法是什么?而且我需要确保它是线程安全的并且没有竞争条件,因为我不能丢失任何clientid

更新

所以我的processData 方法会是这个样子?

  public void processData() {
    synchronized (clientidToTimestampHolder) {
      Iterator<Map.Entry<String, Long>> i = clientidToTimestampHolder.entries().iterator();
      while (i.hasNext()) {
        String clientid = i.next().getKey();
        long timestamp = i.next().getValue();
        boolean isUpdated = isUpdatedClient(clientid, timestamp);
        if (!isUpdated) {
          updateClient(clientid);
        }
        i.remove();
      }
    }
  }

【问题讨论】:

  • 您更新了processData 是错误的,因为它每次hasNext() 检查都会调用Iterator#next() 两次。
  • 是的,明白了。所以我可以clear 同步块内的地图,同时按照您的建议使用 for 循环进行迭代,而不是在迭代器上使用 remove。
  • 要么clear 在我的帖子中的同步块中,要么在您更新的示例中提取条目:while (i.hasNext()) { Entry&lt;String, Long&gt; e = i.next(); String clientid = e.getKey(); long timestamp = e.getValue(); // etc.。或者使用BlockingQueue,正如我在编辑后的帖子中所建议的那样。

标签: java multithreading thread-safety guava multimap


【解决方案1】:

使用Multimaps.synchronized(List)Multimap 包装器对多重映射进行线程安全引用(ArrayListMultimapListMultimap,即将值存储在列表中):

private final ListMultimap<String, Long> clientidToTimestampHolder = 
    Multimaps.synchronizedListMultimap(ArrayListMultimap.create());

请注意,同步多地图包装器有以下警告:

当访问它的任何集合视图时,用户必须在返回的多图上手动同步:

// ...  

不遵循此建议可能会导致不确定的行为。

在您的情况下,您必须手动同步条目视图的迭代,因为它的迭代器不同步:

public void processData() {
  synchronized (clientidToTimestampHolder) {
    for (Map.Entry<String, Long> entry : clientidToTimestampHolder.entries()) {
      String clientid = entry.getKey();
      long timestamp = entry.getValue();
      boolean isUpdated = isUpdatedClient(clientid, timestamp);
      if (!isUpdated) {
        updateClient(String.valueOf(clientid));
      }
    }
    clientidToTimestampHolder.clear();
  }
}

(我使用Mutlimap.entries() 而不是Multimap.asMap().entrySet(),因为这样更干净)。

此外,如果您想知道为什么没有通用的 ConcurrentXxxMultimap 实现,请参阅 Guava's issue #135this comment quoting internal discussion about this

我尝试构建一个通用的并发多图,结果变成了 在一小部分使用中会稍微快一些,但要慢得多 在大多数用途中(与同步多图相比)。我专注于 使尽可能多的操作原子化;较弱的合同将 消除一些这种缓慢,但也会减损它的 有用。

我认为 Multimap 界面太“大”,无法支持 高效的并发实现 - 排序或其他。 (清楚地, 这是夸大其词,但至少它需要 大量的工作或多地图界面的松动。)

编辑:

阅读您的 cmets,对我来说似乎是 XY Problem。话虽如此,IMO 你不应该在这里使用Multimap,因为你不使用它的任何功能,而是使用BlockingQueue,它有一个方便的drainTo(Collection) 方法(并且是线程安全的):

private final LinkedBlockingQueue<Map.Entry<String, Long>> clientidToTimestampHolder =
    new LinkedBlockingQueue<>();

public void add(final String clientid, final Long timestamp) {
  clientidToTimestampHolder.offer(Maps.immutableEntry(clientid, timestamp));
}

public void processData() {
  final List<Map.Entry<String, Long>> entries = new ArrayList<>();
  clientidToTimestampHolder.drainTo(entries);
  for (Map.Entry<String, Long> entry : entries) {
    String clientid = entry.getKey();
    long timestamp = entry.getValue();
    boolean isUpdated = isUpdatedClient(clientid, timestamp);
    if (!isUpdated) {
      updateClient(String.valueOf(clientid));
    }
  }
}

您可以(应该?)为您的数据创建自己的值类来存储 Stringlong 字段并使用它来代替通用的 Map.Entry&lt;String, Long&gt;

【讨论】:

  • 另外,一旦我完成处理,如何从地图中删除条目?那么我需要创建clientidToTimestampHolder 地图的副本,然后将其传递给processData 地图吗?基本上我想每 60 秒处理一次 clientidToTimestampHolder 地图中的任何内容
  • processData 方法仅由单个后台线程每 60 秒调用一次,那为什么我们需要同步呢?
  • @user1234 因为另一个线程可能会在迭代期间修改 multimap,导致(可能)到 ConcurrentModificationException 并且您希望在此处具有确定性行为(文档提到 未能遵循此建议可能会导致在非确定性行为中)。
  • 好的,现在知道了。那么删除已处理的条目怎么样,否则每 60 秒后它会再次开始处理旧条目?正确的?基本上我想每 60 秒处理一次地图中的所有内容
  • 你可以在同步块内做clientidToTimestampHolder.clear(),在迭代之后。但如果这是您的用例,那么您根本不需要多图,请使用BlockingQueue。我会更新我的答案以表明我的意思。
【解决方案2】:

现在,使用您的代码,您将主要观察您的地图是否不一致,因为在一次迭代中,您的 Map 中可能有 [1: "value1",2: "value2",3: "value3"],而下一次迭代您的 Map 可能是 [1: "value1",2: "value2",3: "value3", 4: "value4"]。主要问题是我相信 MultiMap 并不能确保您的元素排队顺序(请参阅this post),因此您可以在迭代期间跳过一个元素(由您决定它是否危险)

如果你真的需要停止每个 put 操作,你确实可以使用 @Xaerxess 方法在 processData() 中同步地图。你提到的另一种可能性是制作一些defensive copying,基本上是迭代你的MultiMap的快照,首先你会这样做:

public Multimap<String, Long> getClientidToTimestampHolder(){
    return ImmutableSetMultimap.copyOf(clientidToTimestampHolder);
}

并且迭代将在这个快照上完成:

 public void processData() {
    Multimap<String, Long> tmpClientToTimestampHolder = getClientidToTimestampHolder();
    for (Entry<String, Collection<Long>> entry : tmpClientToTimestampHolder.asMap().entrySet()) {
      String clientid = entry.getKey();
      Collection<Long> timestamps = entry.getValue();
      for (long timestamp : timestamps) {
        boolean isUpdated = isUpdatedClient(clientid, timestamp);
        if (!isUpdated) {
          updateClient(String.valueOf(clientid));
        }
      }
    }
  }

看到你对删除的评论,你会想做一个同步块来做到这一点atomically

synchronized (clientidToTimestampHolder){
            clientidToTimestampHolder.remove(key, value);//fill key,value, or use removAll(key)
}

为什么需要同步?因为如果你想在时间 t 有准确的映射,那么你需要阻止其他线程向它添加元素。这是通过 Java 中的locking 完成的,因此只要一个线程(这里是您的后台线程)在地图上获得锁,在您读取多地图时,其他线程将无法访问该多地图。

【讨论】:

  • 我不能跳过任何元素,所以我需要确保我正在迭代所有元素,一旦我完成处理,它应该从地图中删除,而不是永远保留在那里。那么您认为最好的方法是什么?
  • 而删除我将添加该代码的位置?它会在 processData 方法中吗?
  • 在这种情况下,我会使用@Xaerxess 的答案并在迭代的集合上同步,并在同一个同步块内,放置remove 方法(所以在processData 中是)。因为制作防御性副本是可以的,只要不需要快照就足够了,如果需要在迭代过程中停止添加元素,那就不够了,您需要锁定对地图的所有访问权限。
  • 在我的问题中添加了更新。这就是你的意思?
  • 我就是这个意思!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-10-19
  • 2020-05-19
相关资源
最近更新 更多