【发布时间】:2020-09-27 03:55:38
【问题描述】:
我有两个集群 - 1. Cloudera Hadoop-Spark 作业在这里运行 2. Cloud - Cassandra集群,多个DC
在将数据帧从我的 spark 作业写入 cassandra 集群时,我在写入前在 spark 中进行了重新分区 (repartionCount=10)。见下文:
import org.apache.spark.sql.cassandra._
records.repartition(repartitionCount).write.cassandraFormat(table, keySpace)
.mode(SaveMode.Append)
.options(options)
.option(CassandraConnectorConf.LocalDCParam.name, cassandraDC.name)
.option(CassandraConnectorConf.ConnectionHostParam.name, cassandraDC.hosts)
.save()
在我的多租户 Spark 集群中,对于具有 20M 条记录的 Spark 批处理加载,以及低于配置,我看到很多任务失败、资源抢占和运行失败。
spark.cassandra.output.batch.grouping.buffer.size=1000
spark.cassandra.output.batch.grouping.key=partition
spark.cassandra.output.concurrent.writes=20
spark.cassandra.connection.compression=LZ4
我应该如何调整这个?重新分区是罪魁祸首吗?
PS:我一开始的理解是:对于 20M 行的负载,“重新分区”应该将负载平均分配给 executors(每个分区有 2M 行),并且批处理将在这些分区级别(在 2M 行)。但是现在,如果 spark-cassandra-connector 在整个数据帧级别(整个 20M 行)上进行批处理,我怀疑这是否会导致不必要的洗牌。
更新:删除“重新分区”大大降低了我的 cloudera spark 集群的性能(在 spark 级别设置的默认分区是 -spark.sql.shuffle.partitions: 200),所以我深入挖掘了一下,发现我最初的理解是正确的。请注意我的 spark 和 cassandra 集群是不同的。 Datastax spark-cassandra-connector 使用 cassandra 协调器节点为每个分区打开一个连接,所以我决定让它保持不变。正如亚历克斯建议的那样,我已经减少了并发写入,我相信这应该会有所帮助。
【问题讨论】:
标签: scala apache-spark cassandra datastax-java-driver spark-cassandra-connector