【发布时间】:2018-03-06 02:14:44
【问题描述】:
我有一堆键 (clientKey) 和值 (processBytes) 我想通过将它们打包到一个字节数组中来发送到我们的消息队列。我将创建一个字节数组,包含所有应该始终小于 50K 的键和值,然后发送到我们的消息队列。
对于每个分区,我都有一堆dataHolders,所以我正在迭代它们,然后将其发送到我的消息队列:-
private void validateAndSend(final DataPartition partition) {
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
// sending data via async policy but it can be send with other two sync queue policy as well.
final Packet packet = new Packet(partition, new QPolicyAsync());
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null) {
packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
dataHolder.getProcessBytes());
}
packet.close();
}
Packet类:该类将所有键和值打包到一个字节数组中,并调用构造函数中传递的相应实现将数据发送到队列。
public final class Packet implements Closeable {
private static final int MAX_SIZE = 50000;
private static final int HEADER_SIZE = 36;
private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte partition;
private final byte replicated;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private final QueuePolicy policy;
private int pendingItems = 0;
public Packet(final DataPartition partition, final QueuePolicy policy) {
this.partition = (byte) partition.getPartition();
this.policy = policy;
this.dataCenter = Utils.LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
}
private void addHeader(final ByteBuffer buffer, final int items) {
buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
.putLong(address).putLong(addressFrom).putLong(addressOrigin).put(partition)
.put(replicated);
}
// sending here by calling policy implementation
private void sendData() {
if (itemBuffer.position() == 0) {
// no data to be sent
return;
}
final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
buffer.put(itemBuffer);
// sending data via particular policy
policy.sendToQueue(address, buffer.array());
itemBuffer.clear();
pendingItems = 0;
}
public void addAndSendJunked(final byte[] key, final byte[] data) {
if (key.length > 255) {
return;
}
final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;
final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
sendData();
}
if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
throw new AppConfigurationException("Size of single item exceeds maximum size");
}
final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
.put(data);
pendingItems++;
}
@Override
public void close() {
if (pendingItems > 0) {
sendData();
}
}
}
现在我可以通过三种不同的方式将数据发送到我的消息队列,因此我创建了一个接口,然后拥有三种不同的实现:
QueuePolicy接口:
public interface QueuePolicy {
public boolean sendToQueue(final long address, final byte[] encodedRecords);
}
QPolicyAsync类:
public class QPolicyAsync implements QueuePolicy {
@Override
public boolean sendToQueue(long address, byte[] encodedRecords) {
return SendRecord.getInstance().sendToQueueAsync(address, encodedRecords);
}
}
QPolicySync类:
public class QPolicySync implements QueuePolicy {
@Override
public boolean sendToQueue(long address, byte[] encodedRecords) {
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords);
}
}
QPolicySyncWithSocket类:
public class QPolicySyncWithSocket implements QueuePolicy {
private final Socket socket;
public QPolicySyncWithSocket(Socket socket) {
this.socket = socket;
}
@Override
public boolean sendToQueue(long address, byte[] encodedRecords) {
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords, Optional.of(socket));
}
}
这个想法很简单:我通过这三个QueuePolicy 实现中的任何一个将数据发送到我的消息传递队列。这取决于客户希望如何发送数据。到目前为止,我正在 Packet 构造函数中传递 QueuePolicy 的实现,然后通过该策略发送数据。每个QueuePolicy实现调用SendRecord类中对应的方法。
现在我需要知道数据是否成功发送。截至目前,Packet 类中的方法没有返回任何布尔值,所以我不知道它是否已成功发送。我可能遇到dataHolders 中只有一个元素或其中可以有多个元素的情况。
private void validateAndSend(final DataPartition partition) {
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
// sending data via async policy but it can be send with other two sync queue policy as well.
final Packet packet = new Packet(partition, new QPolicyAsync());
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null) {
packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
dataHolder.getProcessBytes());
}
packet.close();
// how do I know whether this data was successfully sent?
}
如果我从 Packet 类中的 addAndSendJunked 和 close 方法返回布尔值,那么我需要依赖哪个布尔值?因为这两种方法中的任何一种都可以发送数据。
-
close方法将发送数据,要么其中只有一个元素,要么有剩余元素。 -
addAndSendJunked方法将在达到限制后立即发送数据。
更新:
下面是我更新的代码:
public final class Packet implements Closeable {
private static final int MAX_SIZE = 50000;
private static final int HEADER_SIZE = 36;
private boolean result = false;
private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte partition;
private final byte replicated;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private final QueuePolicy policy;
private int pendingItems = 0;
public Packet(final DataPartition partition, final QueuePolicy policy) {
this.partition = (byte) partition.getPartition();
this.policy = policy;
this.dataCenter = Utils.LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
}
private void addHeader(final ByteBuffer buffer, final int items) {
buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
.putLong(address).putLong(addressFrom).putLong(addressOrigin).put(partition)
.put(replicated);
}
// sending here by calling policy implementation
private void flush() {
if (itemBuffer.position() == 0) {
// no data to be sent
return true;
}
final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
buffer.put(itemBuffer);
// sending data via particular policy
boolean sent = policy.sendToQueue(address, buffer.array());
itemBuffer.clear();
pendingItems = 0;
return sent;
}
public void addAndSendJunked(final byte[] key, final byte[] data) {
if (key.length > 255) {
result = false;
return;
}
final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;
final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
result = flush();
}
if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
throw new AppConfigurationException("Size of single item exceeds maximum size");
}
final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
.put(data);
pendingItems++;
}
@Override
public void close() {
if (pendingItems > 0) {
result = flush();
}
}
public boolean getResult() {
return result;
}
}
【问题讨论】:
标签: java algorithm oop design-patterns