【发布时间】:2017-10-23 21:49:40
【问题描述】:
我有一个方法,它的参数是Partition 枚举。通过传递不同的partition 值,在同一时间段内,多个后台线程(最多 15 个)将调用此方法。这里dataHoldersByPartition 是Partition 和ConcurrentLinkedQueue<DataHolder> 的映射。
private final ImmutableMap<Partition, ConcurrentLinkedQueue<DataHolder>> dataHoldersByPartition;
//... some code to populate entry in `dataHoldersByPartition`
private void validateAndSend(final Partition partition) {
ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
int totalSize = 0;
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null) {
byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
if (clientKeyBytes.length > 255)
continue;
byte[] processBytes = dataHolder.getProcessBytes();
int clientKeyLength = clientKeyBytes.length;
int processBytesLength = processBytes.length;
int additionalLength = clientKeyLength + processBytesLength;
if (totalSize + additionalLength > 50000) {
Message message = new Message(clientKeyBytesAndProcessBytesHolder, partition);
// here size of `message.serialize()` byte array should always be less than 50k at all cost
sendToDatabase(message.getAddress(), message.serialize());
clientKeyBytesAndProcessBytesHolder = new HashMap<>();
totalSize = 0;
}
clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
totalSize += additionalLength;
}
// calling again with remaining values only if clientKeyBytesAndProcessBytesHolder is not empty
if(!clientKeyBytesAndProcessBytesHolder.isEmpty()) {
Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
// here size of `message.serialize()` byte array should always be less than 50k at all cost
sendToDatabase(message.getAddress(), message.serialize());
}
}
下面是我的Message类:
public final class Message {
private final byte dataCenter;
private final byte recordVersion;
private final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte recordsPartition;
private final byte replicated;
public Message(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder, Partition recordPartition) {
this.clientKeyBytesAndProcessBytesHolder = clientKeyBytesAndProcessBytesHolder;
this.recordsPartition = (byte) recordPartition.getPartition();
this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
}
// Output of this method should always be less than 50k always
public byte[] serialize() {
int bufferCapacity = getBufferCapacity(clientKeyBytesAndProcessBytesHolder); // 36 + dataSize + 1 + 1 + keyLength + 8 + 2;
ByteBuffer byteBuffer = ByteBuffer.allocate(bufferCapacity).order(ByteOrder.BIG_ENDIAN);
// header layout
byteBuffer.put(dataCenter).put(recordVersion).putInt(clientKeyBytesAndProcessBytesHolder.size())
.putInt(bufferCapacity).putLong(address).putLong(addressFrom).putLong(addressOrigin)
.put(recordsPartition).put(replicated);
// now the data layout
for (Map.Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
byte keyType = 0;
byte[] key = entry.getKey();
byte[] value = entry.getValue();
byte keyLength = (byte) key.length;
short valueLength = (short) value.length;
ByteBuffer dataBuffer = ByteBuffer.wrap(value);
long timestamp = valueLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
byteBuffer.put(keyType).put(keyLength).put(key).putLong(timestamp).putShort(valueLength)
.put(value);
}
return byteBuffer.array();
}
private int getBufferCapacity(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
int size = 36;
for (Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
size += 1 + 1 + 8 + 2;
size += entry.getKey().length;
size += entry.getValue().length;
}
return size;
}
// getters and to string method here
}
基本上,我必须确保无论何时调用 sendToDatabase 方法,message.serialize() 字节数组的大小都应始终小于 50k。我的sendToDatabase 方法发送来自serialize 方法的字节数组。由于这种情况,我正在做低于验证以及其他一些事情。在该方法中,我将迭代dataHolders CLQ 并从中提取clientKeyBytes 和processBytes。这是我正在做的验证:
- 如果
clientKeyBytes的长度大于255,那么我将跳过它并继续迭代。 - 我将不断增加
totalSize变量,这将是clientKeyLength和processBytesLength的总和,并且这个totalSize的长度应始终小于50000 字节。 - 一旦达到 50000 限制,我会将
clientKeyBytesAndProcessBytesHolder映射发送到sendToDatabase方法并清除映射,将totalSize重置为 0 并重新开始填充。 - 如果它没有达到该限制并且
dataHolders为空,那么它将发送它拥有的任何东西。
我相信我当前的代码中存在一些错误,因为我的状况可能导致某些记录没有被正确发送或丢弃在某个地方,我无法弄清楚这一点。看起来要正确实现这个 50k 条件我可能必须在调用 sendToDatabase 方法之前使用 getBufferCapacity 方法来正确计算大小?
【问题讨论】:
-
嗯好吧,你现在怎么想?基本上,我必须确保无论何时调用
sendToDatabase方法,message.serialize()字节数组的大小应始终小于 50k。为此,我认为我需要准确计算传递给Message类的clientKeyBytesAndProcessBytesHolder映射中的所有内容。 -
两个可能会导致问题的事情浮现在脑海中:1) 每个
Partition是否只有一个DataHolder具有相同的客户端密钥?如果不是,则只有最后存储的字节会在地图中。 2) 是否有单个DataHolder的进程字节> 50k?很高兴知道dataHoldersByPartition中的数据类型。 -
你已经解决了吗?
-
@MickMnemonic 还没有。我仍然坚持这一点。因此,根据您的第 2 点,有可能(可能是机器人)我们可以拥有单个
DataHolder和process bytes > 50k,在这种情况下,我将按原样删除该记录并移至下一个记录,所以也许我会添加对process bytes长度的检查,如果超过 50k,则丢弃。对吗?
标签: java multithreading data-structures concurrency queue