【问题标题】:Using a Datastax Cassandra ResultSet with Java 8 Parallel Streams - Quickly使用带有 Java 8 并行流的 Datastax Cassandra 结果集 - 快速
【发布时间】:2016-05-15 11:49:24
【问题描述】:

我正在使用 Datastax 驱动程序从 Cassandra 获取大量行,我需要尽快处理它们。

我已经研究过使用List::parallelStream().forEach(),起初看起来很棒,因为ResultSet 的作用很像List,但遗憾的是我无法直接在ResultSet 上使用parallelStream()。为了让它工作,我首先必须使用 ResultSet::all() 这真的很慢 - 我假设它会迭代每个元素。

ResultSet rs = this.getResultSet(); // Takes <1 second

// Convert the ResultSet to a list so as I can use parallelStream().
List<Row> rsList = rs.all(); // Takes 21 seconds

rsList.parallelStream().forEach(this::processRow); // Takes 3 seconds

有没有更快的方法可以处理结果集的每一行?

【问题讨论】:

  • 出于好奇,rList的顺序处理需要多少时间?查看您的时序图,您的瓶颈不是行的处理,而是它们的检索。所以并行化行处理会给你一个相当小的改进。在我看来,您似乎应该考虑优化您的查询/模式/集群设置/网络。
  • 我来自 PHP 背景,所以我习惯于在调用 execute() 后包含行的结果集。这里不是这样吗?
  • 与您使用的驱动程序无关,问题是一样的:您通过驱动程序向 C* 提交查询; C* 处理查询并计算结果集的行; C* 将结果行发送回驱动程序(在 @doanduyhai 指出的页面中)。一旦驱动程序使第一行可用,客户端就可以开始处理行(one() 返回第一行)。因此,您可以在 C* 发送更多行的同时开始并行处理行,从而有效地并行化该过程。但总的来说,您永远不会比 C* 向您发送结果的最后一行所需的时间更快。
  • 您的意思是,C* 发送到行所花费的时间是这里的瓶颈?我想解决这个问题的唯一方法是更改​​架构和查询?
  • 是的,完全正确。您为rs.all() 测量的 21 秒是 C* 计算和传递构成结果集的行所花费的累积时间,包括与有线格式之间的转换。处理结果集只需要 3 秒,如果您现在通过调用 rs.all() 等待所有行交付,而是在第一行可用时立即开始处理行,则可能对整体处理时间的影响几乎可以忽略不计(参见“流水线”)。因此,开始优化您花费最多时间的地方。哪个在 C* 端。

标签: java list cassandra resultset datastax


【解决方案1】:

要让它工作,我首先必须使用 ResultSet::all() 确实很慢

ResultSet.all() 将使用服务器端 分页 获取 所有行。您可以使用statement.setFetchSize()控制页面大小

有没有更快的方法可以处理结果集的每一行?

这取决于您的查询,它是什么?如果您正在执行全分区扫描,那么只有几台机器在做这项工作,但如果您要从多个分区获取数据,您可以尝试使用多个查询并行化它们,每个分区一个查询

【讨论】:

    【解决方案2】:

    你可以试试这个:

    ResultSet rs = this.getResultSet(); // Takes <1 second
    
    StreamSupport.stream(
        Spliterators.spliteratorUnknownSize(
                    rs.iterator(), Spliterator.ORDERED), false)
           .parallel().forEach(this::processRow);
    

    省略对rs.all()的调用

    希望如果ResultSet 允许立即开始迭代,您将能够更早地并行化处理。

    更新

    检查ResultSet的来源后,我看到的是:

    方法 all() 创建一个新的 ArrayList 并填充它,在您的情况下这需要 21 秒

    List<Row> result = new ArrayList<Row>(rows.size());
    for (Row row : this)
        result.add(row);
    

    在迭代器中实现的方法next() 改为轮询行队列

    public Row next() {
        return Row.fromData(metadata, rows.poll());
    }
    

    这意味着数据处理无需等待 21 秒即可开始处理第一行。

    【讨论】:

    • 实施您的第一个建议仍然需要 >20 秒来处理行。似乎在后端,线程无法同时从 ResultSet 中读取。
    • 我明白了。很抱歉听到这个消息。这不是处理中的并行性问题。我确定它工作正常。它只是从数据库中获取行太慢了。根据阿姆达尔定律en.wikipedia.org/wiki/Amdahl%27s_law,当链式计算的并行部分相对于顺序计算而言较小时,整体加速很小。
    【解决方案3】:

    与作者描述的结果几乎相同。 我的解决方案是将 FetchSize 设置为更大的值。正如我所读的,默认值为 5000。获取所有并对其进行迭代对我来说大约需要 25 秒。使用 .setFetchSize(50000) 迭代需要 0.8 秒。我什至还不相信。使用简单的 foreach 循环进行迭代

    我的代码:

    String sql = "...."
    prepearedSql = session.prepare(sql);
    Statement statement = prepearedSql.bind().setValues(...).setFetchSize(50000);
    ResultSet result = session.execute(statement);
    for (Row row : result)
        {...
    

    【讨论】:

      猜你喜欢
      • 2018-10-10
      • 1970-01-01
      • 2013-10-31
      • 2016-08-22
      • 2015-07-23
      • 2019-08-26
      • 2014-06-11
      • 2016-03-21
      • 2018-01-18
      相关资源
      最近更新 更多