【问题标题】:Spark-Cassandra Connector -- Spark and Cassandra partitions -- data localitySpark-Cassandra 连接器——Spark 和 Cassandra 分区——数据局部性
【发布时间】:2021-11-21 05:26:34
【问题描述】:

我有一个 16 节点集群,在我使用 Spark-Cassandra 连接器 3.0.0 时,每个节点都安装了 Spark 和 Cassandra。 Spark 集群有 16 个执行器,每个执行器有 2 个核心,总​​共 32 个核心。我在 Cassandra 数据库中有大约 22 亿行(也是主键),总共有 4.827 个唯一分区键。我正在使用数据帧/数据集,代码在 Java 中,而我也在 spark 配置中使用 .config("spark.sql.shuffle.partitions",96)。在代码中,我选择所有 22 亿行并加入分区键。

  1. 在 Spark GUI 中,我看到有一个包含 32 个任务的广播,这意味着使用了 Sparks Join,而 32 个任务是因为可用的内核。这是否意味着最初将创建 32 个 Spark 分区,这 22 亿行将驻留?

  2. 在使用 Join 之前,我一定要使用 .repartitionByCassandraReplica 吗?我不相信它是必需的,但事实是,如果我尝试使用它,我会收到“找不到符号”的错误。此外,当我的分区键少于 2600 个时,会激活 DirectJoin。

我的目标是利用数据局部性并避免数据传输。

编辑 1

对于问题 1,我浏览了您发送的链接,正如您所说,大小基于 system.size_estimates 表中的任何内容。

  • 根据 nodetool 我有 16 个节点 x ~8.9Gb = 143Gb,复制因子为 3,因此 143/3 = 47.6Gb。所以根据公式,必须有大约 47600/64 = ~744 个 spark 分区。
  • 但是根据 system.size_estimates 表,partitions_count 列的总和为 1883 个分区,mean_partition_size 为 48042720。这意味着表大小为 1883 x 48Mb = 90384Mb 或 ~90Gb,与 143Gb 有点远。

对于问题 2,我的 Cassandra 表是:

 CREATE TABLE experiment(
 experimentid varchar,
 description text,
 rt float,
 intensity float,
 mz float,
 identifier text,
 chemical_formula text,
 filename text,
 PRIMARY KEY ((experimentid),description, rt, intensity, mz, identifier, chemical_formula, filename));

火花代码是:

Dataset<Row> dfexplist = sp.createDataset(experimentlist, Encoders.STRING()).toDF("experimentid");

Dataset<Row> metlistinitial = sp.read().format("org.apache.spark.sql.cassandra")
                .options(new HashMap<String, String>() {
                    {
                        put("keyspace", "mdb");
                        put("table", "experiment");
                    }
                })
                .load().select(col("experimentid"), col("description"), col("intensity")).join(dfexplist, "experimentid").repartition(col("experimentid"));

这是否实现了数据局部性?在我加入时或之前是否有洗牌?最后,我根据分区键重新分区,以避免以后计算中的任何洗牌。

【问题讨论】:

    标签: java apache-spark cassandra spark-cassandra-connector data-partitioning


    【解决方案1】:

    对于问题 1,Spark 分区与核心数或任务数没有直接关系。 Spark 分区由连接器使用估计的表大小(来自 Cassandra system.size_estimates 表)和输入拆分大小来计算。公式为:

    spark_partitions = estimated_table_size / input.split.size_in_mb
    

    如果您想了解详细信息,我已在https://community.datastax.com/questions/11500/ 中进行了说明。

    对于问题 2,使用repartitionByCassandraReplica() 方法来利用数据局部性并最小化改组绝对是一个好主意。但是,我不确定您为什么会收到该错误。如果您使用最少的代码 + 复制问题的数据来更新您的原始问题,我很乐意对其进行审查并相应地更新我的答案。干杯!

    【讨论】:

    • 非常感谢您的宝贵时间!我添加了 EDIT 1。如果您需要任何其他信息,请告诉我。我正在尝试了解数据局部性在我的案例中是如何工作的。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-06-04
    • 2016-08-14
    • 1970-01-01
    • 2017-01-13
    • 2016-01-05
    • 2015-10-08
    • 2015-03-12
    相关资源
    最近更新 更多