【问题标题】:Write dataframe from spark cluster to cassandra cluster: Partitioning and Performance Tuning将数据帧从 spark 集群写入 cassandra 集群:分区和性能调优
【发布时间】:2020-09-27 03:55:38
【问题描述】:

我有两个集群 - 1. Cloudera Hadoop-Spark 作业在这里运行 2. Cloud - Cassandra集群,多个DC

在将数据帧从我的 spark 作业写入 cassandra 集群时,我在写入前在 spark 中进行了重新分区 (repartionCount=10)。见下文:

import org.apache.spark.sql.cassandra._
records.repartition(repartitionCount).write.cassandraFormat(table, keySpace)
  .mode(SaveMode.Append)
  .options(options)
  .option(CassandraConnectorConf.LocalDCParam.name, cassandraDC.name)
  .option(CassandraConnectorConf.ConnectionHostParam.name, cassandraDC.hosts)
  .save()

在我的多租户 Spark 集群中,对于具有 20M 条记录的 Spark 批处理加载,以及低于配置,我看到很多任务失败、资源抢占和运行失败。

spark.cassandra.output.batch.grouping.buffer.size=1000
spark.cassandra.output.batch.grouping.key=partition
spark.cassandra.output.concurrent.writes=20 
spark.cassandra.connection.compression=LZ4

我应该如何调整这个?重新分区是罪魁祸首吗?

PS:我一开始的理解是:对于 20M 行的负载,“重新分区”应该将负载平均分配给 executors(每个分区有 2M 行),并且批处理将在这些分区级别(在 2M 行)。但是现在,如果 spark-cassandra-connector 在整个数据帧级别(整个 20M 行)上进行批处理,我怀疑这是否会导致不必要的洗牌。

更新:删除“重新分区”大大降低了我的 cloudera spark 集群的性能(在 spark 级别设置的默认分区是 -spark.sql.shuffle.partitions: 200),所以我深入挖掘了一下,发现我最初的理解是正确的。请注意我的 spark 和 cassandra 集群是不同的。 Datastax spark-cassandra-connector 使用 cassandra 协调器节点为每个分区打开一个连接,所以我决定让它保持不变。正如亚历克斯建议的那样,我已经减少了并发写入,我相信这应该会有所帮助。

【问题讨论】:

    标签: scala apache-spark cassandra datastax-java-driver spark-cassandra-connector


    【解决方案1】:

    您不需要在 Spark 中进行重新分区 - 只需将数据从 Spark 写入 Cassandra,不要尝试更改 Spark Cassandra 连接器的默认值 - 它们在大多数情况下都可以正常工作。你需要看看发生了什么样的阶段失败——很可能你只是因为spark.cassandra.output.concurrent.writes=20而重载了Cassandra(使用默认值(5))——有时更少的写入器有助于更快地写入数据,因为你不会过载Cassandra,并且作业没有重新启动。

    附: partitionspark.cassandra.output.batch.grouping.key - 它不是 Spark 分区,它是 Cassandra 分区,它取决于分区键列的值。

    【讨论】:

    • 非常感谢。随着越来越多的批处理作业并行写入 cassandra 集群,我的集群中出现了很多问题。每个批处理作业尽管很大,但具有很高的基数,其中单个分区 (cassandra) 通常可能少于 1000 行,因此批处理在运行时可能会更小,从而导致对 cassandra 的写入请求更多。此外,我在极少数情况下观察到 cassandra 读取在大量写入期间会下降,尽管写入本身总是非常快。在我的场景中,replica_set 是一个不错的选择,以及上述建议。
    • 如果您的 Cassandra 节点在读取期间经常出现故障,您可能需要调整输入参数。例如,使用LOCAL_ONE 读取通常会使节点过载,而使用LOCAL_QUORUM 读取会减少单个节点的负载,并且因为它不会崩溃,所以它读取速度更快,尽管LOCAL_QUORUMLOCAL_ONE
    • 我们使用 EACH_QUORUM 写入,使用 LOCAL_QUORUM 读取。 Cassandra 并没有停机,但在大量写入期间读取延迟增加了许多倍。
    • 这在意料之中,但主要是你的节点没有关闭
    猜你喜欢
    • 2017-10-11
    • 2019-09-07
    • 2016-04-08
    • 2019-01-18
    • 1970-01-01
    • 2015-08-07
    • 2016-04-16
    • 2017-02-10
    • 2017-02-10
    相关资源
    最近更新 更多