【问题标题】:Cassandra Async reads and writes, Best practicesCassandra 异步读写,最佳实践
【发布时间】:2018-11-24 07:50:08
【问题描述】:

要设置上下文, 我们在 cassandra 中有 4 个表,其中 4 个是数据表,剩下的一个是搜索表(假设 DATA、SEARCH1、SEARCH2 和 SEARCH3 是表)。

我们有一个初始加载要求,一个请求中最多有 15k 行,用于 DATA 表,因此要保持搜索表同步。 我们以批量插入的方式执行此操作,每个 bacth 作为 4 个查询(每个表一个)以保持一致性。

但是对于每个批次,我们都需要读取数据。如果存在,只更新 DATA 表的 lastUpdatedDate 列,否则插入到所有 4 个表中。

下面是代码sn-p我们是怎么做的:

public List<Items> loadData(List<Items> items) {
    CountDownLatch latch = new CountDownLatch(items.size());
    ForkJoinPool pool = new ForkJoinPool(6);
    pool.submit(() -> items.parallelStream().forEach(item -> {
      BatchStatement batch = prepareBatchForCreateOrUpdate(item);
      batch.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
      ResultSetFuture future = getSession().executeAsync(batch);
      Futures.addCallback(future, new AsyncCallBack(latch), pool);
    }));

    try {
      latch.await();
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }

    //TODO Consider what to do with the failed Items, Retry? or remove from the items in the return type
    return items;
}

private BatchStatement prepareBatchForCreateOrUpdate(Item item) {
    BatchStatement batch = new BatchStatement();
    Item existingItem = getExisting(item) //synchronous read
    if (null != data) {
      existingItem.setLastUpdatedDateTime(new Timestamp(System.currentTimeMillis()));
      batch.add(existingItem));
      return batch;
    }

    batch.add(item);
    batch.add(convertItemToSearch1(item));
    batch.add(convertItemToSearch2(item));
    batch.add(convertItemToSearch3(item));

    return batch;
  }

class AsyncCallBack implements FutureCallback<ResultSet> {
    private CountDownLatch latch;

    AsyncCallBack(CountDownLatch latch) {
      this.latch = latch;
    }

    // Cooldown the latch for either success or failure so that the thread that is waiting on latch.await() will know when all the asyncs are completed.
    @Override
    public void onSuccess(ResultSet result) {
      latch.countDown();
    }

    @Override
    public void onFailure(Throwable t) {
      LOGGER.warn("Failed async query execution, Cause:{}:{}", t.getCause(), t.getMessage());
      latch.countDown();
    }
  }

考虑到网络往返 b/w 应用程序和 cassandra 集群,15k 个项目的执行大约需要 1.5 到 2 分钟(两者都驻留在相同的 DNS 上,但 kubernetes 上的 pod 不同)

我们有想法使读取调用 getExisting(item) 也异步,但处理失败情况变得越来越复杂。 cassandra 的数据加载是否有更好的方法(仅考虑通过 datastax 企业 java 驱动程序的 Async wites)。

【问题讨论】:

    标签: asynchronous cassandra datastax-enterprise datastax-java-driver


    【解决方案1】:

    第一件事——Cassandra 中的批处理不同于关系数据库。通过使用它们,您会给集群带来更多负载。

    关于使一切异步,我考虑了以下可能性:

    1. 对数据库进行查询,获取Future 并向其添加侦听器 - 将在查询完成时执行(覆盖onSuccess);
    2. 通过该方法,您可以根据从 Cassandra 获得的结果安排下一个操作的执行。

    您需要确保检查的一件事是,您不会同时发出过多的并发请求。在协议的第 3 版中,每个连接最多可以有 32k 个正在进行的请求,但在您的情况下,您最多可以发出 60k (4x15k) 个请求。我正在使用following wrapper around Session class 来限制进行中请求的数量。

    【讨论】:

    • 感谢 Alex 的回答,是的,cassandra 中的批次是另一回事,这就是为什么我们对 15k 条记录使用单独的批次,每个批次都有一个查询来更新每个表以保持数据同步。我喜欢你的会话限制示例。 :)
    • 我意识到这个评论迟了——但是,假设你的插入是幂等的,我会做单独的异步写入,具有 QUORUM/ALL 一致性并重试失败。就数据一致性而言,这应该为您提供可接受的容错程度。
    • 是的。幂等查询+重试很有用
    • 附带说明,SessionLimiter 代码很好——我们必须处理限制 Cassandra 集群中的动态消息的问题,我们通过批处理解决了这个问题(原始但有效)
    • @user1694845 批处理的主要问题是,如果它们不在同一个分区中,它们会非常慢。 SessionLimiter 有一个问题,它不是最优的——它不知道每个连接的限制,而是限制每个集群。 Java 驱动 4.x 已内置节流...
    猜你喜欢
    • 2010-10-09
    • 1970-01-01
    • 2011-08-12
    • 2013-02-02
    • 1970-01-01
    • 2018-12-26
    • 1970-01-01
    • 2020-08-26
    • 1970-01-01
    相关资源
    最近更新 更多