【问题标题】:Send records to messaging queue using either of one policy使用任一策略将记录发送到消息传递队列
【发布时间】:2018-03-06 02:14:44
【问题描述】:

我有一堆键 (clientKey) 和值 (processBytes) 我想通过将它们打包到一个字节数组中来发送到我们的消息队列。我将创建一个字节数组,包含所有应该始终小于 50K 的键和值,然后发送到我们的消息队列。

对于每个分区,我都有一堆dataHolders,所以我正在迭代它们,然后将其发送到我的消息队列:-

private void validateAndSend(final DataPartition partition) {
  final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);

  // sending data via async policy but it can be send with other two sync queue policy as well.
  final Packet packet = new Packet(partition, new QPolicyAsync());

  DataHolder dataHolder;
  while ((dataHolder = dataHolders.poll()) != null) {
    packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
        dataHolder.getProcessBytes());
  }
  packet.close();
}

Packet类:该类将所有键和值打包到一个字节数组中,并调用构造函数中传递的相应实现将数据发送到队列。

public final class Packet implements Closeable {
  private static final int MAX_SIZE = 50000;
  private static final int HEADER_SIZE = 36;

  private final byte dataCenter;
  private final byte recordVersion;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte partition;
  private final byte replicated;
  private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
  private final QueuePolicy policy;
  private int pendingItems = 0;

  public Packet(final DataPartition partition, final QueuePolicy policy) {
    this.partition = (byte) partition.getPartition();
    this.policy = policy;
    this.dataCenter = Utils.LOCATION.get().datacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    final long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  }

  private void addHeader(final ByteBuffer buffer, final int items) {
    buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
        .putLong(address).putLong(addressFrom).putLong(addressOrigin).put(partition)
        .put(replicated);
  }

  // sending here by calling policy implementation
  private void sendData() {
    if (itemBuffer.position() == 0) {
      // no data to be sent
      return;
    }
    final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
    addHeader(buffer, pendingItems);
    buffer.put(itemBuffer);
    // sending data via particular policy
    policy.sendToQueue(address, buffer.array());
    itemBuffer.clear();
    pendingItems = 0;
  }

  public void addAndSendJunked(final byte[] key, final byte[] data) {
    if (key.length > 255) {
      return;
    }
    final byte keyLength = (byte) key.length;
    final byte dataLength = (byte) data.length;

    final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
    final int newSize = itemBuffer.position() + additionalSize;
    if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
      sendData();
    }
    if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
      throw new AppConfigurationException("Size of single item exceeds maximum size");
    }

    final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
    final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
    // data layout
    itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
        .put(data);
    pendingItems++;
  }

  @Override
  public void close() {
    if (pendingItems > 0) {
      sendData();
    }
  }
}

现在我可以通过三种不同的方式将数据发送到我的消息队列,因此我创建了一个接口,然后拥有三种不同的实现:

QueuePolicy接口

public interface QueuePolicy {
    public boolean sendToQueue(final long address, final byte[] encodedRecords);
}

QPolicyAsync

public class QPolicyAsync implements QueuePolicy {

  @Override
  public boolean sendToQueue(long address, byte[] encodedRecords) {
    return SendRecord.getInstance().sendToQueueAsync(address, encodedRecords);
  }
}

QPolicySync

public class QPolicySync implements QueuePolicy {

  @Override
  public boolean sendToQueue(long address, byte[] encodedRecords) {
    return SendRecord.getInstance().sendToQueueSync(address, encodedRecords);
  }
}

QPolicySyncWithSocket

public class QPolicySyncWithSocket implements QueuePolicy {
  private final Socket socket;

  public QPolicySyncWithSocket(Socket socket) {
    this.socket = socket;
  }

  @Override
  public boolean sendToQueue(long address, byte[] encodedRecords) {
    return SendRecord.getInstance().sendToQueueSync(address, encodedRecords, Optional.of(socket));
  }
}

这个想法很简单:我通过这三个QueuePolicy 实现中的任何一个将数据发送到我的消息传递队列。这取决于客户希望如何发送数据。到目前为止,我正在 Packet 构造函数中传递 QueuePolicy 的实现,然后通过该策略发送数据。每个QueuePolicy实现调用SendRecord类中对应的方法。


现在我需要知道数据是否成功发送。截至目前,Packet 类中的方法没有返回任何布尔值,所以我不知道它是否已成功发送。我可能遇到dataHolders 中只有一个元素或其中可以有多个元素的情况。

private void validateAndSend(final DataPartition partition) {
  final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);

  // sending data via async policy but it can be send with other two sync queue policy as well.
  final Packet packet = new Packet(partition, new QPolicyAsync());

  DataHolder dataHolder;
  while ((dataHolder = dataHolders.poll()) != null) {
    packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
        dataHolder.getProcessBytes());
  }
  packet.close();
  // how do I know whether this data was successfully sent?

}

如果我从 Packet 类中的 addAndSendJunkedclose 方法返回布尔值,那么我需要依赖哪个布尔值?因为这两种方法中的任何一种都可以发送数据。

  • close 方法将发送数据,要么其中只有一个元素,要么有剩余元素。
  • addAndSendJunked 方法将在达到限制后立即发送数据。

更新:

下面是我更新的代码:

public final class Packet implements Closeable {
  private static final int MAX_SIZE = 50000;
  private static final int HEADER_SIZE = 36;
  private boolean result = false;

  private final byte dataCenter;
  private final byte recordVersion;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte partition;
  private final byte replicated;
  private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
  private final QueuePolicy policy;
  private int pendingItems = 0;

  public Packet(final DataPartition partition, final QueuePolicy policy) {
    this.partition = (byte) partition.getPartition();
    this.policy = policy;
    this.dataCenter = Utils.LOCATION.get().datacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    final long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  }

  private void addHeader(final ByteBuffer buffer, final int items) {
    buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
        .putLong(address).putLong(addressFrom).putLong(addressOrigin).put(partition)
        .put(replicated);
  }

  // sending here by calling policy implementation
  private void flush() {
    if (itemBuffer.position() == 0) {
      // no data to be sent
      return true;
    }
    final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
    addHeader(buffer, pendingItems);
    buffer.put(itemBuffer);
    // sending data via particular policy
    boolean sent = policy.sendToQueue(address, buffer.array());
    itemBuffer.clear();
    pendingItems = 0;
    return sent;
  }

  public void addAndSendJunked(final byte[] key, final byte[] data) {
    if (key.length > 255) {
      result = false;
      return;
    }
    final byte keyLength = (byte) key.length;
    final byte dataLength = (byte) data.length;

    final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
    final int newSize = itemBuffer.position() + additionalSize;
    if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
      result = flush();
    }
    if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
      throw new AppConfigurationException("Size of single item exceeds maximum size");
    }

    final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
    final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
    // data layout
    itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
        .put(data);
    pendingItems++;
  }

  @Override
  public void close() {
    if (pendingItems > 0) {
      result = flush();
    }
  }

  public boolean getResult() {
    return result;
  }
}

【问题讨论】:

    标签: java algorithm oop design-patterns


    【解决方案1】:

    您不能从 close() 方法返回布尔值,因为它已被覆盖。

    你有不同的选择:

    1. 每当发送返回 false 时抛出异常
    2. sendData 返回一个布尔值,我将把它重命名为flush 并公开(见下文),并从addAndSendJunked 返回一个布尔值。
    3. Packet 类中添加一个布尔字段和一个 getter,以便能够随时获取其值

    方法flush

    public boolean flush() {
        if (itemBuffer.position() == 0) {
            // no data to be sent
            return true;
        }
        final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
        addHeader(buffer, pendingItems);
        buffer.put(itemBuffer);
        // sending data via particular policy
        boolean result = policy.sendToQueue(address, buffer.array());
        itemBuffer.clear();
        pendingItems = 0;
        return result;
    }
    

    【讨论】:

    • 你能告诉我为什么flush方法需要在这里公开吗?就我而言,没有人从Packet 类之外调用该方法。我也用你的建议更新了这个问题。你认为这是对的还是我也可以做一些其他的改变?我从 flush 方法返回一个布尔值,并在 Packet 类及其 getter 中有一个布尔变量。
    • @user1950349 flush 是公共的,因为它在不能返回布尔值的close 中使用。如果您想知道发送结果,应该直接调用flush
    猜你喜欢
    • 2020-01-07
    • 1970-01-01
    • 2010-11-23
    • 2013-11-15
    • 1970-01-01
    • 2016-05-02
    • 1970-01-01
    • 2021-09-26
    • 1970-01-01
    相关资源
    最近更新 更多