【发布时间】:2021-11-21 05:26:34
【问题描述】:
我有一个 16 节点集群,在我使用 Spark-Cassandra 连接器 3.0.0 时,每个节点都安装了 Spark 和 Cassandra。 Spark 集群有 16 个执行器,每个执行器有 2 个核心,总共 32 个核心。我在 Cassandra 数据库中有大约 22 亿行(也是主键),总共有 4.827 个唯一分区键。我正在使用数据帧/数据集,代码在 Java 中,而我也在 spark 配置中使用 .config("spark.sql.shuffle.partitions",96)。在代码中,我选择所有 22 亿行并加入分区键。
-
在 Spark GUI 中,我看到有一个包含 32 个任务的广播,这意味着使用了 Sparks Join,而 32 个任务是因为可用的内核。这是否意味着最初将创建 32 个 Spark 分区,这 22 亿行将驻留?
-
在使用 Join 之前,我一定要使用 .repartitionByCassandraReplica 吗?我不相信它是必需的,但事实是,如果我尝试使用它,我会收到“找不到符号”的错误。此外,当我的分区键少于 2600 个时,会激活 DirectJoin。
我的目标是利用数据局部性并避免数据传输。
编辑 1
对于问题 1,我浏览了您发送的链接,正如您所说,大小基于 system.size_estimates 表中的任何内容。
- 根据 nodetool 我有 16 个节点 x ~8.9Gb = 143Gb,复制因子为 3,因此 143/3 = 47.6Gb。所以根据公式,必须有大约 47600/64 = ~744 个 spark 分区。
- 但是根据 system.size_estimates 表,partitions_count 列的总和为 1883 个分区,mean_partition_size 为 48042720。这意味着表大小为 1883 x 48Mb = 90384Mb 或 ~90Gb,与 143Gb 有点远。
对于问题 2,我的 Cassandra 表是:
CREATE TABLE experiment(
experimentid varchar,
description text,
rt float,
intensity float,
mz float,
identifier text,
chemical_formula text,
filename text,
PRIMARY KEY ((experimentid),description, rt, intensity, mz, identifier, chemical_formula, filename));
火花代码是:
Dataset<Row> dfexplist = sp.createDataset(experimentlist, Encoders.STRING()).toDF("experimentid");
Dataset<Row> metlistinitial = sp.read().format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "mdb");
put("table", "experiment");
}
})
.load().select(col("experimentid"), col("description"), col("intensity")).join(dfexplist, "experimentid").repartition(col("experimentid"));
这是否实现了数据局部性?在我加入时或之前是否有洗牌?最后,我根据分区键重新分区,以避免以后计算中的任何洗牌。
【问题讨论】:
标签: java apache-spark cassandra spark-cassandra-connector data-partitioning