【问题标题】:Sending data to a database in size-limited chunks以大小有限的块将数据发送到数据库
【发布时间】:2017-10-23 21:49:40
【问题描述】:

我有一个方法,它的参数是Partition 枚举。通过传递不同的partition 值,在同一时间段内,多个后台线程(最多 15 个)将调用此方法。这里dataHoldersByPartitionPartitionConcurrentLinkedQueue<DataHolder> 的映射。

  private final ImmutableMap<Partition, ConcurrentLinkedQueue<DataHolder>> dataHoldersByPartition;

  //... some code to populate entry in `dataHoldersByPartition`

  private void validateAndSend(final Partition partition) {  
    ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
    Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
    int totalSize = 0;      
    DataHolder dataHolder;
    while ((dataHolder = dataHolders.poll())  != null) {      
      byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
      if (clientKeyBytes.length > 255)
        continue;

      byte[] processBytes = dataHolder.getProcessBytes();
      int clientKeyLength = clientKeyBytes.length;
      int processBytesLength = processBytes.length;

      int additionalLength = clientKeyLength + processBytesLength;
      if (totalSize + additionalLength > 50000) {
        Message message = new Message(clientKeyBytesAndProcessBytesHolder, partition);
        // here size of `message.serialize()` byte array should always be less than 50k at all cost
        sendToDatabase(message.getAddress(), message.serialize());
        clientKeyBytesAndProcessBytesHolder = new HashMap<>();
        totalSize = 0;
      }
      clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
      totalSize += additionalLength;
    }
    // calling again with remaining values only if clientKeyBytesAndProcessBytesHolder is not empty
    if(!clientKeyBytesAndProcessBytesHolder.isEmpty()) {
        Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
        // here size of `message.serialize()` byte array should always be less than 50k at all cost
        sendToDatabase(message.getAddress(), message.serialize());      
    }
  }

下面是我的Message类:

public final class Message {
  private final byte dataCenter;
  private final byte recordVersion;
  private final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte recordsPartition;
  private final byte replicated;

  public Message(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder, Partition recordPartition) {
    this.clientKeyBytesAndProcessBytesHolder = clientKeyBytesAndProcessBytesHolder;
    this.recordsPartition = (byte) recordPartition.getPartition();
    this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  }

  // Output of this method should always be less than 50k always
  public byte[] serialize() {
    int bufferCapacity = getBufferCapacity(clientKeyBytesAndProcessBytesHolder); // 36 + dataSize + 1 + 1 + keyLength + 8 + 2;

    ByteBuffer byteBuffer = ByteBuffer.allocate(bufferCapacity).order(ByteOrder.BIG_ENDIAN);
    // header layout
    byteBuffer.put(dataCenter).put(recordVersion).putInt(clientKeyBytesAndProcessBytesHolder.size())
        .putInt(bufferCapacity).putLong(address).putLong(addressFrom).putLong(addressOrigin)
        .put(recordsPartition).put(replicated);

    // now the data layout
    for (Map.Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
      byte keyType = 0;
      byte[] key = entry.getKey();
      byte[] value = entry.getValue();
      byte keyLength = (byte) key.length;
      short valueLength = (short) value.length;

      ByteBuffer dataBuffer = ByteBuffer.wrap(value);
      long timestamp = valueLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();

      byteBuffer.put(keyType).put(keyLength).put(key).putLong(timestamp).putShort(valueLength)
          .put(value);
    }
    return byteBuffer.array();
  }

  private int getBufferCapacity(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
    int size = 36;
    for (Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
      size += 1 + 1 + 8 + 2;
      size += entry.getKey().length;
      size += entry.getValue().length;
    }
    return size;
  }

    // getters and to string method here
}

基本上,我必须确保无论何时调用 sendToDatabase 方法,message.serialize() 字节数组的大小都应始终小于 50k。我的sendToDatabase 方法发送来自serialize 方法的字节数组。由于这种情况,我正在做低于验证以及其他一些事情。在该方法中,我将迭代dataHolders CLQ 并从中提取clientKeyBytesprocessBytes。这是我正在做的验证:

  • 如果clientKeyBytes 的长度大于255,那么我将跳过它并继续迭代。
  • 我将不断增加totalSize 变量,这将是clientKeyLengthprocessBytesLength 的总和,并且这个totalSize 的长度应始终小于50000 字节。
  • 一旦达到 50000 限制,我会将 clientKeyBytesAndProcessBytesHolder 映射发送到 sendToDatabase 方法并清除映射,将 totalSize 重置为 0 并重新开始填充。
  • 如果它没有达到该限制并且dataHolders 为空,那么它将发送它拥有的任何东西。

我相信我当前的代码中存在一些错误,因为我的状况可能导致某些记录没有被正确发送或丢弃在某个地方,我无法弄清楚这一点。看起来要正确实现这个 50k 条件我可能必须在调用 sendToDatabase 方法之前使用 getBufferCapacity 方法来正确计算大小?

【问题讨论】:

  • 嗯好吧,你现在怎么想?基本上,我必须确保无论何时调用sendToDatabase 方法,message.serialize() 字节数组的大小应始终小于 50k。为此,我认为我需要准确计算传递给Message 类的clientKeyBytesAndProcessBytesHolder 映射中的所有内容。
  • 两个可能会导致问题的事情浮现在脑海中:1) 每个Partition 是否只有一个DataHolder 具有相同的客户端密钥?如果不是,则只有最后存储的字节会在地图中。 2) 是否有单个DataHolder 的进程字节> 50k?很高兴知道dataHoldersByPartition 中的数据类型。
  • 你已经解决了吗?
  • @MickMnemonic 还没有。我仍然坚持这一点。因此,根据您的第 2 点,有可能(可能是机器人)我们可以拥有单个 DataHolderprocess bytes &gt; 50k,在这种情况下,我将按原样删除该记录并移至下一个记录,所以也许我会添加对 process bytes 长度的检查,如果超过 50k,则丢弃。对吗?

标签: java multithreading data-structures concurrency queue


【解决方案1】:

我检查了您的代码,按照您的逻辑,它看起来不错。正如您所说,它将始终存储小于 50K 的信息,但它实际上会将信息存储到 50K。要使其小于 50K,您必须将 if 条件更改为 if (totalSize + additionalLength &gt;= 50000)

如果您的代码仍然无法满足您的要求,即当 totalSize + additionalLength 大于 50k 时存储信息,我可以建议您很少考虑。

由于超过 50 个线程调用此方法,您需要考虑代码中的两个部分进行同步。 一个是全局变量,它是一个容器dataHoldersByPartition 对象。如果在此容器对象中发生多个并发和并行搜索,结果可能并不完美。只需检查容器类型是否同步。如果没有像下面这样制作这个块:-

synchronized(this){
    ConcurrentLinkedQueue<DataHolder> dataHolders =  dataHoldersByPartition.get(partition);
}

现在,我只能给出两个建议来解决这个问题。一种是代替if (totalSize + additionalLength &gt; 50000),您可以检查对象clientKeyBytesAndProcessBytesHolderif(sizeof(clientKeyBytesAndProcessBytesHolder) &gt;= 50000) 的大小(检查java 中sizeof 的适当方法)。第二个是缩小范围以检查它是否是多线程的副作用。所有这些建议都是为了找出问题所在的区域,并且应该只从您的角度进行修复。

首先检查您的方法validateAndSend 是否完全满足您的要求。为此,首先同步整个validateAndSend 方法并检查一切是否正常或仍然具有相同的结果。如果仍然有相同的结果,这意味着它不是因为多线程,而是您的编码不符合要求。如果它工作正常,则意味着它是多线程的问题。如果方法同步正在解决您的问题但降低了性能,您只需从中删除同步并集中可能导致问题的代码的每个小块,并使其同步块并在仍未解决您的问题时删除。就像这样,最终您找到了实际产生问题的代码块,并将其保留为同步以最终修复它。

例如第一次尝试:-

  `private synchronize void validateAndSend`

第二次尝试:从方法中删除同步关键字并执行以下步骤:-

           synchronize(this){
            Message message = new Message(clientKeyBytesAndProcessBytesHolder, partition);                  
            sendToDatabase(message.getAddress(), message.serialize());
     }

如果您认为我没有正确理解您,请告诉我。

【讨论】:

  • 请完整阅读代码。不要只看状态。如果数据小于 50k,该条件将确保发送数据,因为我们在底部向 totalSize 添加了附加长度。
  • 你能告诉我你目前面临的所有问题。这个对象是dataHoldersByPartitionchynchronize 吗?
  • 我已经更新了我的问题,请从头再来看看。 dataHoldersByPartition 是 ImmutableMap 所以我相信它已经同步了。
  • 我的答案已经修改,请查收。
【解决方案2】:

在您的validateAndSend 中,我会将整个数据放入队列,并在单独的线程中进行整个处理。请考虑命令模型。这样所有线程都会将它们的负载放在队列中。消费者线程拥有所有数据,所有信息,并且可以非常有效地处理它。唯一复杂的部分是将响应/结果发送回调用线程。因为在你的情况下这不是问题 - 更好。这种模式还有更多好处 - 请查看netflix/hystrix

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-01-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多