【问题标题】:Spark dataset filter elementsSpark 数据集过滤器元素
【发布时间】:2020-12-20 19:44:15
【问题描述】:

我有 2 个火花数据集: courseDS 和 latestLessonDS ;

这是我的 Spark 数据集 POJO:

Lesson class:
    private List<SessionFilter> info;
    private lessonId;

LatestLesson class:

   private String id:

SessionFilter class:
   private String id;
   private String sessionName;

我想获取所有课程数据,其中 info.id 在课程课程中而不是在最新课程 id 中。

类似这样的:

lessonDS.filter(explode(col("info.id")).notEqual(latestLessonDS.col("value"))).show();

latestLessonDS contain:

100A
200C
300A
400A


lessonDS contain:

1,[[100A,jon],[200C,natalie]]
2,[[100A,jon]]
3,[[600A,jon],[400A,Kim]]

result:
3,[[600A,jon]

【问题讨论】:

  • lessonDS.filter(explode(col("info.id")).notEqual(latestLessonDS.col("value"))).show();这个陈述有点令人困惑......如果您正在寻找左侧不存在的所有 sessionId,那么您的最终结果应该看起来 3[[600A,jon]]

标签: dataframe apache-spark dataset


【解决方案1】:

通常array_contains函数在加入lessonDslatestLessonDs时可以作为加入条件。但是这个函数在这里不起作用,因为连接条件要求lessonDs.info.id所有元素出现在latestLessonDS中。

获得结果的一种方法是分解lessonDs,与latestLessonDs 连接,然后通过比较数字来检查lessonDs.info所有 元素是否存在latestLessonDs 中的条目加入前后的信息元素:

lessonDs
  .withColumn("noOfEntries", size('info))
  .withColumn("id", explode(col("info.id")))
  .join(latestLessonDs, "id" )
  .groupBy("lessonId", "info", "noOfEntries").count()
  .filter("noOfEntries = count")
  .drop("noOfEntries", "count")
  .show(false)

打印

+--------+------------------------------+
|lessonId|info                          |
+--------+------------------------------+
|1       |[[100A, jon], [200C, natalie]]|
|2       |[[100A, jon]]                 |
+--------+------------------------------+

【讨论】:

  • 你能帮我理解什么.groupBy("lessonId", "info", "noOfEntries").count() .filter("noOfEntries = count") .drop("noOfEntries", "count")。这里的陈述是什么意思。为什么我们需要 noOfEntries?
  • count()groupBy的聚合函数。这里计算每个lessonId在连接后剩下多少行,结果存储在名为count的列中。然后通过比较countnoOfEntries,将此数字与加入前的条目数进行比较。了解其工作原理的最简单方法可能是在每行之后插入 show 并检查每行所做的转换。
  • 我刚刚发现我发布的问题中要实现的结果与我所需要的正好相反。我已经更新了我需要的结果。你能帮忙吗?
  • 其实这并不是完全相反的。对于第 3 课,缺少 Kim 的条目。相反,您可以简单地将noOfEntries = count 更改为noOfEntries&lt;&gt;count
【解决方案2】:

如果您的数据集大小 latestLessonDS 足够合理,您可以收集并广播 然后在课程DS 上进行简单的过滤器转换会给你想要的结果。

喜欢

import scala.collection.JavaConversions._
 import spark.implicits._ 
 val bc = spark.sparkContext.broadcast(latestLessonDS.collectAsList().toSeq)
    lessonDS.mapPartitions(itr => {
      val cache = bc.value;
      itr.filter(x => {
        //check in cache 
      })
    })

【讨论】:

  • latestLessonDS 的大小接近 9GB 。这可以使用 SQL 操作来完成吗??
  • latestLessonDS.distinct size?? 9GB 是不同的记录大小吗?
  • 是的,正确..latestLessonDS 中没有重复项
  • 这可以使用sql查询来完成吗,我不知道如何使用SQL查询来实现结果
  • @Coder123 是的,您可以做到...通过使用带有子查询的 (In|Exist) 运算符...在此之前创建带有爆炸信息的数据帧的 tempTables.. 然后执行 spark.sql(" select * from df1 where df1.id IN (select df2.value from df2)");得到想要的结果。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-11-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多