【发布时间】:2022-08-11 11:38:28
【问题描述】:
我们有一个名为 cassandra-scan 的程序,它使用 spark-cassandra-connector 列出一个非常大的表中分区键的所有值。该表有大约 1700 万个 Cassandra 分区,每个分区平均有 200 行。容纳此表的 Cassandra 集群在 6 个节点上运行 DSE 5.1.8。包含该表的键空间的复制因子是 3。
以下是键空间和表的简化定义。
CREATE KEYSPACE myspace WITH replication = {\'class\': \'SimpleStrategy\', \'replication_factor\': \'3\'} AND durable_writes = true;
CREATE TABLE myspace.largetable (
id text,
itemOrder text,
...
PRIMARY KEY (id, itemOrder)
) WITH CLUSTERING ORDER BY (itemOrder ASC)
cassandra-scan 中用于列出分区键的所有值的语句如下:
val res = sc.cassandraTable(keyspace, table).select(\"id\").perPartitionLimit(1).repartition(320)
我们使用 Apache Spark 2.3.1 和 spark-cassandra-connector 2.3.2。用于启动 cassandra-scan 的命令如下。
/path/to/spark/spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class \"CassandraScan\" --jars /path/to/spark-cassandra-connector_2.11-2.3.2.jar --executor-memory 15g --master local[20] cassandra-scan.jar &
cassandra-scan 运行正常,大约需要 19 个小时。
我们最近建立了一个新的 Cassandra 集群,同样有 6 个节点(与第一个集群中使用的节点不同)。此集群运行 DSE 6.8.16。第一个表中的所有数据都已添加到新集群中的一个表中。
我们将 Apache Spark 的版本更新到 2.4.8,将 spark-cassandra-connector 更新到 2.4.2。我们测试了程序没有。 Spark 分区的数量在 2000 到 200,000 之间。我们无法让 cassandra-scan 正确运行。我们看到以下形式的错误:
java.io.IOException: Exception during execution of SELECT \"id\" FROM \"myspace\".\"largetable\" WHERE token(\"id\") > ? AND token(\"id\") <= ? PER PARTITION LIMIT 1 ALLOW FILTERING: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)
一些 cassandra-scan 运行导致一些 Cassandra 节点出现故障,并在 Cassandra 日志中显示如下消息。
INFO [CoreThread-22] 2022-04-03 06:26:35,467 InboundHandshakeHandler.java:353 - Failed to properly handshake with peer /xxx.xxx.xxx.xxx:41231. Closing the channel.
java.lang.OutOfMemoryError: Direct buffer memory
WARN [Outbound-/xxx.xxx.xxx.xxx-large-message-writer] 2022-04-01 19:17:58,248 AbstractOutboundMessageHandler.java:80 - LARGE_MESSAGE with id 97 from /xxx.xxx.xxx.xxx to /xxx.xxx.xxx.xxx via (/xxx.xxx.xxx.xxx,/xxx.xxx.xxx.xxx:7000) error...
java.io.IOException: java.lang.RuntimeException: io.netty.channel.unix.Errors$NativeIoException: writeAddress(..) failed: Connection reset by peer
非常感谢任何有助于使其正常工作的帮助。谢谢。
-
你配置的
timeout是什么?ALLOW FILTERING查询可能非常慢。 -
datastax.java.driver.basic.request.timeout 的默认值为 2 秒。 cassandra.yaml 中的 read_request_timeout_in_ms 设置为 30000。看起来 ALLOW FILTERING 正在自动添加到某处。源中的查询不包含“ALLOW FILTERING”。尝试从 Java 应用程序 \"SE:LECT DISTINCT id FROM ...\" 进行简单的 CQL 查询,页面大小设置为 100,basic.request.timeout 增加到 30 秒。这似乎有效,但速度很慢。
-
很可能 ALLOW FILTERING 是由 spark 添加的,因为它是跨分区请求。这个查询很慢,因为......嗯,它是对这种大小的数据的缓慢操作。您可能需要以某种方式分别处理这些 ID。
标签: cassandra spark-cassandra-connector