【问题标题】:Spark RDD join with Cassandra TableSpark RDD 加入 Cassandra 表
【发布时间】:2020-06-25 08:09:19
【问题描述】:

我正在加入 Spark RDDCassandra table(查找)但无法理解一些事情。

  1. Spark 会从 Cassandra table 拉取 range_start 和 range_end 之间的所有记录,然后将其与 spark 内存中的 RDD 连接起来,否则它会将 RDD 中的所有值下推到 Cassandra 并在那里执行连接
  2. limit(1) 将应用于何处? (CassandraSpark
  3. 无论应用什么限制(1 或 1000),Spark 是否总是从 Cassandra 中提取相同数量的记录?

代码如下:

//creating dataframe with fields required for join with cassandra table
//and converting same to rdd
val df_for_join = src_df.select(src_df("col1"),src_df("col2"))
val rdd_for_join = df_for_join.rdd

val result_rdd = rdd_for_join
.joinWithCassandraTable("my_keyspace", "my_table"
,selectedColumns = SomeColumns("col1","col2","col3","col4")
,SomeColumns("col1", "col2")
).where("created_at >''range_start'' and created_at<= range_end")
.clusteringOrder(Ascending).limit(1)

Cassandra 表详细信息 -

PRIMARY KEY ((col1, col2), created_at) WITH CLUSTERING ORDER BY (created_at ASC)

【问题讨论】:

  • @Atish 这是不同的 API...

标签: scala apache-spark cassandra datastax spark-cassandra-connector


【解决方案1】:

joinWithCassandra 表从传递的 RDD 中提取分区/主键值,并将它们转换为针对 Cassandra 中的分区的单独请求。然后,除此之外,SCC 可能会应用额外的过滤,例如,您是where 条件。如果我没记错的话,但我可能是错的,限制不会完全推送到 Cassandra - 它仍然可能会为每个分区获取 limit 行。

您始终可以通过执行result_rdd.toDebugString 来检查连接发生的位置。对于我的代码:

val df_for_join = Seq((2, 5),(5, 2)).toDF("col1", "col2")
val rdd_for_join = df_for_join.rdd

val result_rdd = rdd_for_join
.joinWithCassandraTable("test", "jt"
,selectedColumns = SomeColumns("col1","col2", "v")
,SomeColumns("col1", "col2")
).where("created_at >'2020-03-13T00:00:00Z' and created_at<= '2020-03-14T00:00:00Z'")
.limit(1)

它给出了以下内容:

scala> result_rdd.toDebugString
res7: String =
(2) CassandraJoinRDD[14] at RDD at CassandraRDD.scala:19 []
 |  MapPartitionsRDD[2] at rdd at <console>:45 []
 |  MapPartitionsRDD[1] at rdd at <console>:45 []
 |  ParallelCollectionRDD[0] at rdd at <console>:45 []

如果您进行“正常”连接,您将获得以下信息:

scala> val rdd1 = sc.parallelize(Seq((2, 5),(5, 2)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[21] at parallelize at <console>:44
scala> val ct = sc.cassandraTable[(Int, Int)]("test", "jt").select("col1", "col2")
ct: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Int, Int)] = CassandraTableScanRDD[31] at RDD at CassandraRDD.scala:19

scala> rdd1.join(ct)
res15: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[34] at join at <console>:49
scala> rdd1.join(ct).toDebugString
res16: String =
(6) MapPartitionsRDD[37] at join at <console>:49 []
 |  MapPartitionsRDD[36] at join at <console>:49 []
 |  CoGroupedRDD[35] at join at <console>:49 []
 +-(3) ParallelCollectionRDD[21] at parallelize at <console>:44 []
 +-(6) CassandraTableScanRDD[31] at RDD at CassandraRDD.scala:19 []

更多信息请访问corresponding section of SCC documentation

【讨论】:

  • 从您的输出中,您如何知道连接发生在哪里?
  • 它发生在 CassandraJoinRDD 内部,将 rdd_for_join 中的单个值转换为单独的请求
  • 我的错,我应该更清楚。我的意思是我怎么知道加入是发生在 Spark 上还是 Cassandra 上
  • 我尝试使用相同的代码两次,一次限制为 1,另一次限制为 1000。这是为了查看实际从 Cassandra 拉入 Spark 的记录数。如果最终限制适用于 Spark 级别,那么对于这两种情况,我应该将相同数量的行拉入 spark 中。但是从 Spark UI 中,我无法理解在这两种情况下都提取了多少行。
  • 根据源代码,设置限制应该将LIMIT N添加到查询中...我可以启用记录执行的查询以查看实际发送到Cassandra的内容...跨度>
猜你喜欢
  • 2020-05-23
  • 2019-11-12
  • 2020-12-14
  • 2021-02-03
  • 1970-01-01
  • 2016-01-30
  • 2020-10-13
  • 2023-01-05
  • 2015-06-22
相关资源
最近更新 更多