【问题标题】:How to batch select data from Cassandra effectively?如何有效地从 Cassandra 批量选择数据?
【发布时间】:2017-04-08 23:53:06
【问题描述】:

我知道 Cassandra 不支持批量查询,也不推荐使用IN,因为它会降低性能。但是我必须通过id来获取数据,例如:

select * from visit where id in ([visit_id array])

描述表:

CREATE TABLE visit (
    enterprise_id int,
    id text,
    ........
    PRIMARY KEY (enterprise_id, id)

数组可能有数千个元素。有什么办法可以有效地做到吗?

【问题讨论】:

  • 向我们展示您的表架构
  • @AshrafulIslam 请看我的更新

标签: cassandra cql


【解决方案1】:

Large In 查询创建 GC 暂停和堆压力,导致整体性能下降。当您执行大量查询时,这意味着您正在等待这个单一的协调器节点给您一个响应,它将所有这些查询及其响应保存在堆中,如果其中一个查询失败,或者协调器失败,您有重试整个过程。

方法一:

尝试将您的 in 查询转换为范围查询 (>=,

SELECT * visit WHERE enterprise_id = ? and id >= ? and id <= ?

方法二:

使用 executeAsync,Java 示例

PreparedStatement statement = session.prepare("SELECT * FROM visit where enterprise_id = ? and id = ?");

List<ResultSetFuture> futures = new ArrayList<>();
for (int i = 1; i < 4; i++) {
    ResultSetFuture resultSetFuture = session.executeAsync(statement.bind(i, i));
    futures.add(resultSetFuture);
}

List<String> results = new ArrayList<>();
for (ResultSetFuture future : futures){
     ResultSet rows = future.getUninterruptibly();
     Row row = rows.one();
     results.add(row.getString("name"));
}
return results; 

方法三:

如果可能的话,而不是在查询中,创建另一个表,当您将在查询中执行的数据即将插入或更新时也将数据插入到新表中,那么您可以只从新表中查询而无需在查询中

来源:
http://www.datastax.com/dev/blog/a-deep-look-to-the-cql-where-clause https://lostechies.com/ryansvihla/2014/09/22/cassandra-query-patterns-not-using-the-in-query-for-multiple-partitions/

【讨论】:

  • 感谢您的解释,帮了大忙!
  • 方法 1 不起作用,因为 id 是字符串而不是 int,但方法 2 是一个好点。
【解决方案2】:

发出此类查询的首选方式是展开 IN 部分。这仅仅意味着您需要并行发出多个查询,因为 token-o-matic(又名令牌感知)驱动程序会将每个查询视为一个独立的查询,然后将这些查询传播到不同的节点,使每个单个节点成为负责将要到达的每个查询的协调器。

您应该最多运行 X 个查询并等待其中至少一个完成(我使用 Java):

final int X = 1000;
ArrayList<ResultSetFuture> futures = new ArrayList<>();
ArrayList<ResultSet> results = new ArrayList<>();
for (int i = 0; i < allTheRowsINeedToFetch; i++) {
    futures.add(session.executeAsync(myBeautifulPreparedStatement.bind(xxx,yyy,zzz)));
    while (futures.size() >= X || (futures.size() > 0 && futures.get(0).isDone())) {
        ResultSetFuture rsf = futures.remove(0);
        results.add(rsf.getUninterruptibly());
    }
}

while (futures.size() > 0) {
    ResultSetFuture rsf = futures.remove(0);
    results.add(rsf.getUninterruptibly());
}

// Now use the results

这被称为背压,它用于将压力从集群移动客户端

这种方法的好处是您可以真正并行 (X = allTheRowsINeedToFetch),也可以真正串行 (X = 1),一切都在之间仅取决于您的集群硬件。 X 的低值意味着您没有使用集群功能足够,高值意味着您会遇到麻烦,因为您会开始看到超时。所以,你真的需要调整它。

【讨论】:

  • 在这段代码中,X 是最大并发,但代码不能确保我们确实有 X 个进行中的请求。例如,如果我们有 1000 个请求,X=10,第一个请求需要 1 秒,所有其他请求每个需要 1 毫秒,那么它将产生 10 个请求,然后等待约 1 秒以使第一个请求完成,即使剩下的 9 个“插槽”已经可用。
  • 当然,但我认为这不太可能并且不是问题:如果第一个查询(或第 7 个成为第一个)需要 1 秒,那么您的集群上发生了一些事情,所以暂停一下似乎恕我直言,最好的事情。此外,针对“真正的”并行进行优化会使代码(很多?)更加复杂。不值得……
猜你喜欢
  • 2016-04-14
  • 1970-01-01
  • 1970-01-01
  • 2019-07-13
  • 1970-01-01
  • 1970-01-01
  • 2016-08-22
  • 1970-01-01
  • 2014-12-03
相关资源
最近更新 更多