【问题标题】:How do you list all the partition keys in a large Cassandra table in Cassandra using Spark?如何使用 Spark 在 Cassandra 的大型 Cassandra 表中列出所有分区键?
【发布时间】:2022-08-11 11:38:28
【问题描述】:

我们有一个名为 cassandra-scan 的程序,它使用 spark-cassandra-connector 列出一个非常大的表中分区键的所有值。该表有大约 1700 万个 Cassandra 分区,每个分区平均有 200 行。容纳此表的 Cassandra 集群在 6 个节点上运行 DSE 5.1.8。包含该表的键空间的复制因子是 3。

以下是键空间和表的简化定义。

CREATE KEYSPACE myspace WITH replication = {\'class\': \'SimpleStrategy\', \'replication_factor\': \'3\'}  AND durable_writes = true;

CREATE TABLE myspace.largetable (
    id text,
    itemOrder text,
    ...
    PRIMARY KEY (id, itemOrder)
) WITH CLUSTERING ORDER BY (itemOrder ASC)

cassandra-scan 中用于列出分区键的所有值的语句如下:

val res = sc.cassandraTable(keyspace, table).select(\"id\").perPartitionLimit(1).repartition(320)

我们使用 Apache Spark 2.3.1 和 spark-cassandra-connector 2.3.2。用于启动 cassandra-scan 的命令如下。

/path/to/spark/spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class \"CassandraScan\" --jars /path/to/spark-cassandra-connector_2.11-2.3.2.jar --executor-memory 15g --master local[20] cassandra-scan.jar &

cassandra-scan 运行正常,大约需要 19 个小时。

我们最近建立了一个新的 Cassandra 集群,同样有 6 个节点(与第一个集群中使用的节点不同)。此集群运行 DSE 6.8.16。第一个表中的所有数据都已添加到新集群中的一个表中。

我们将 Apache Spark 的版本更新到 2.4.8,将 spark-cassandra-connector 更新到 2.4.2。我们测试了程序没有。 Spark 分区的数量在 2000 到 200,000 之间。我们无法让 cassandra-scan 正确运行。我们看到以下形式的错误:

java.io.IOException: Exception during execution of SELECT \"id\" FROM \"myspace\".\"largetable\" WHERE token(\"id\") > ? AND token(\"id\") <= ?  PER PARTITION LIMIT 1 ALLOW FILTERING: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)

一些 cassandra-scan 运行导致一些 Cassandra 节点出现故障,并在 Cassandra 日志中显示如下消息。

INFO  [CoreThread-22] 2022-04-03 06:26:35,467  InboundHandshakeHandler.java:353 - Failed to properly handshake with peer /xxx.xxx.xxx.xxx:41231. Closing the channel.
java.lang.OutOfMemoryError: Direct buffer memory
WARN  [Outbound-/xxx.xxx.xxx.xxx-large-message-writer] 2022-04-01 19:17:58,248  AbstractOutboundMessageHandler.java:80 - LARGE_MESSAGE with id 97 from /xxx.xxx.xxx.xxx to /xxx.xxx.xxx.xxx via (/xxx.xxx.xxx.xxx,/xxx.xxx.xxx.xxx:7000) error...
java.io.IOException: java.lang.RuntimeException: io.netty.channel.unix.Errors$NativeIoException: writeAddress(..) failed: Connection reset by peer

非常感谢任何有助于使其正常工作的帮助。谢谢。

  • 你配置的timeout是什么? ALLOW FILTERING 查询可能非常慢。
  • datastax.java.driver.basic.request.timeout 的默认值为 2 秒。 cassandra.yaml 中的 read_request_timeout_in_ms 设置为 30000。看起来 ALLOW FILTERING 正在自动添加到某处。源中的查询不包含“ALLOW FILTERING”。尝试从 Java 应用程序 \"SE:LECT DISTINCT id FROM ...\" 进行简单的 CQL 查询,页面大小设置为 100,basic.request.timeout 增加到 30 秒。这似乎有效,但速度很慢。
  • 很可能 ALLOW FILTERING 是由 spark 添加的,因为它是跨分区请求。这个查询很慢,因为......嗯,它是对这种大小的数据的缓慢操作。您可能需要以某种方式分别处理这些 ID。

标签: cassandra spark-cassandra-connector


【解决方案1】:

此错误表明集群中至少有一个节点无法为请求提供服务:

    Not enough replicas available for query at consistency LOCAL_ONE \
      (1 required but only 0 alive)

您需要查看 Cassandra 日志以确定 (1) 哪些节点无响应/不可用,以及 (2) 原因。干杯!

【讨论】:

  • 嗨,埃里克 :) 我不认为是这样,他只是因为查询速度慢而没时间。
【解决方案2】:

我们使用DataStax Bulk Loader 来解决 问题。

dsbulk unload \
  --connector.csv.url <path>/<to>/<outputDir> \
  -h <host> \
  -query "select distinct id from myspace.largetable"

dsbulk 花了大约 3 个小时来获得 1750 万个值。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-07-18
    • 1970-01-01
    • 2016-12-06
    • 2017-03-18
    • 2018-04-22
    • 2013-09-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多