【问题标题】:Spark java filter isin method or something else?Spark java过滤器isin方法还是其他?
【发布时间】:2023-03-08 04:46:01
【问题描述】:

我的 cassandra 数据库中有大约 20 亿行,我使用 isin 方法根据包含 4827 个字符串的实验列表进行过滤,如下所示。但是,我注意到在 distinct 命令之后,我只有 4774 个唯一行。任何想法为什么缺少 53? isin 方法是否有阈值/限制?我对实验列表进行了两次和三次检查,它确实有 4827 个字符串,并且数据库中确实存在其他 53 个字符串,因为我可以使用 cqlsh 查询它们。非常感谢任何帮助!

Dataset<Row> df1 = sp.read().format("org.apache.spark.sql.cassandra")
                .options(new HashMap<String, String>() {
                    {
                        put("keyspace", "mdb");
                        put("table", "experiment");
                    }
                })
                .load().select(col("experimentid")).filter(col("experimentid").isin(experimentlist.toArray()));
List<String> tmplist=df1.distinct().as(Encoders.STRING()).collectAsList();
   
System.out.println("tmplist "+tmplist.size());

【问题讨论】:

  • experimentid 分区键吗?
  • 是的!分区键有限制吗?
  • 不,我会建议更有效的解决方案 :-) 给我一些时间,写一下...
  • 非常感谢!我期待你的回答! :)
  • 忘了问 - 集群中有多少个节点,mdb keyspace 的复制因子是多少?

标签: java apache-spark cassandra spark-cassandra-connector


【解决方案1】:

关于“丢失数据”的实际问题 - 当您的集群缺少写入并且没有定期进行修复时,可能会出现问题。 Spark Cassandra 连接器 (SCC) 读取一致性级别为LOCAL_ONE 的数据,并且可能会命中没有所有数据的节点。例如,您可以尝试将一致性级别设置为LOCAL_QUORUM(通过--conf spark.cassandra.input.consistency.level=LOCAL_QUORUM),然后重复实验,但最好确保数据已修复。

您遇到的另一个问题是您正在使用.isin 函数 - 它正在转换为查询SELECT ... FROM table WHERE partition_key IN (list)。查看执行计划:

scala> import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.cassandra._
scala> val data = spark.read.cassandraFormat("m1", "test").load()
data: org.apache.spark.sql.DataFrame = [id: int, m: map<int,string>]

scala> data.filter($"id".isin(Seq(1,2,3,4):_*)).explain
== Physical Plan ==
*Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [id#169,m#170] PushedFilters: [*In(id, [1,2,3,4])], ReadSchema: struct<id:int,m:map<int,string>>

这个查询效率很低,给执行查询的节点增加了额外的负载。在 SCC 2.5.0 中,对此进行了一些优化,但最好使用同样是 introduced in the SCC 2.5.0 的所谓“直接连接”,这样 SCC 将并行执行对特定分区键的请求 - 这样更有效,并且节点的负载越少。您可以按以下方式使用它(唯一的区别是我将它作为“DSE Direct Join”,而在 OSS SCC 中它打印为“Cassandra Direct Join”):

scala> val toJoin = Seq(1,2,3,4).toDF("id")
toJoin: org.apache.spark.sql.DataFrame = [id: int]

scala> val joined = toJoin.join(data, data("id") === toJoin("id"))
joined: org.apache.spark.sql.DataFrame = [id: int, id: int ... 1 more field]

scala> joined.explain
== Physical Plan ==
DSE Direct Join [id = id#189] test.m1 - Reading (id, m) Pushed {}
+- LocalTableScan [id#189]

这种直接连接优化需要显式启用为described in the documentation

【讨论】:

  • 非常感谢您的回答!我会尝试更新我的 SCC,但我不确定它是否让我选择 .directJoin 或 .joinWithCassandraTable,我唯一需要的是 .join 和 .joinWith。我将使用 SCC 2.5.0 更新我的代码,我会检查它并通知您!希望它是直截了当的。
  • 直接连接是内部实现——您只需在 Dataframe 上执行普通的.join,SCC 就会对其进行优化。我已经更新了有关如何启用此优化的答案
  • 非常感谢!会这样做。我将稍微更新一下我的代码(从 SCC 2.4.0 到 SCC 2.5.0)并试一试!
  • 所以,我将SCC 2.5.0的maven依赖添加到我的应用程序中,它无法下载它。我手动下载了它并将其放入我的存储库中,但我仍然无法构建我的项目!
  • 我刚刚收到“正在下载:repo.maven.apache.org/maven2/com/datastax/spark/…”构建失败。它没有说别的。无法下载,好像依赖不存在一样。与此同时,我使用了加入我拥有的 SCC(2.4.0)。 20 亿行需要 45 分钟。但是,在 PCA 期间,我得到“要求失败:在只有 1 行的矩阵上调用 RowMatrix.computeCovariance。无法计算
猜你喜欢
  • 2022-03-07
  • 1970-01-01
  • 1970-01-01
  • 2016-12-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-12-20
相关资源
最近更新 更多