【问题标题】:Break big spark sql query into smaller queries and merge it将大的 spark sql 查询分解为较小的查询并将其合并
【发布时间】:2017-07-26 19:21:12
【问题描述】:

我有一个很大的 spark sql 语句,我试图将其分成更小的块以提高代码的可读性。我不想加入它,只是合并结果。

当前工作的 sql 语句-

val dfs = x.map(field => spark.sql(s"
   select ‘test’ as Table_Name,
          '$field' as Column_Name, 
          min($field) as Min_Value, 
          max($field) as Max_Value, 
          approx_count_distinct($field) as Unique_Value_Count,
          (
            SELECT 100 * approx_count_distinct($field)/count(1) 
            from tempdftable
          ) as perc 
   from tempdftable
”))

我正在尝试从上面的 sql 中取出下面的查询

(SELECT 100 * approx_count_distinct($field)/count(1) from tempdftable) as perc

用这个逻辑 -

 val Perce = x.map(field => spark.sql(s"(SELECT 100 * approx_count_distinct($field)/count(1) from parquetDFTable)"))

然后将此 val Perce 与带有以下语句的第一个大 SQL 语句合并,但它不起作用 -

val dfs = x.map(field => spark.sql(s"
  select ‘test’ as Table_Name,
         '$field' as Column_Name, 
         min($field) as Min_Value, 
         max($field) as Max_Value, 
         approx_count_distinct($field) as Unique_Value_Count,
         '"+Perce+ "'
  from tempdftable
”))

我们如何写这个?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql spark-streaming spark-dataframe


    【解决方案1】:

    为什么不全力以赴,将整个表达式转换为 Spark 代码?

    import spark.implicits._
    import org.apache.spark.sql.functions._
    
    val fraction = udf((approxCount: Double, totalCount: Double) => 100 * approxCount/totalCount)
    
    val fields = Seq("colA", "colB", "colC")
    
    val dfs = fields.map(field => {
      tempdftable
        .select(min(field) as "Min_Value", max(field) as "Max_Value", approx_count_distinct(field) as "Unique_Value_Count", count(field) as "Total_Count")
        .withColumn("Table_Name", lit("test"))
        .withColumn("Column_Name", lit(field))
        .withColumn("Perc", fraction('Unique_Value_Count, 'Total_Count))
        .select('Table_Name, 'Column_Name, 'Min_Value, 'Max_Value, 'Unique_Value_Count, 'Perc)
    })
    
    val df = dfs.reduce(_ union _)
    

    在这样的测试示例中:

    val tempdftable = spark.sparkContext.parallelize(List((3.0, 7.0, 2.0), (1.0, 4.0, 10.0), (3.0, 7.0, 2.0), (5.0, 0.0, 2.0))).toDF("colA", "colB", "colC")
    
    tempdftable.show
    
    +----+----+----+
    |colA|colB|colC|
    +----+----+----+
    | 3.0| 7.0| 2.0|
    | 1.0| 4.0|10.0|
    | 3.0| 7.0| 2.0|
    | 5.0| 0.0| 2.0|
    +----+----+----+
    

    我们得到

    df.show
    
    +----------+-----------+---------+---------+------------------+----+
    |Table_Name|Column_Name|Min_Value|Max_Value|Unique_Value_Count|Perc|
    +----------+-----------+---------+---------+------------------+----+
    |      test|       colA|      1.0|      5.0|                 3|75.0|
    |      test|       colB|      0.0|      7.0|                 3|75.0|
    |      test|       colC|      2.0|     10.0|                 2|50.0|
    +----------+-----------+---------+---------+------------------+----+
    

    【讨论】:

    • 谢谢格伦尼!它有帮助,我接受了这个答案,但我擅长 SQL,并且很少有表达式使用分析函数(如 RANK)和纯 Spark,不知道如何实现这些结果。
    • 导入 org.apache.spark.sql.functions._ 为您提供了大多数 (??) sql 函数。包括rank;)
    • 再次感谢格伦尼!请问我可以从哪里获得这些信息,我可以参考的任何文件成为大师:P :)
    • 嗯,是的,好吧...我不得不说 Spark 文档并不是我遇到过的最好的 ;) 就个人而言,我从阅读博文中学到了很多东西(来自 Databricks、Cloudera 和喜欢),但大多数情况下,我刚刚与 Spark 合作了近 3 年。不过,我要说的是,学习 Spark 语法——即使你擅长 sql——也是非常值得的,如果你熟悉 LINQ 或 Java Streams API,那么它不会花你更多的时间在您熟练使用 Spark 之前的几周 :)
    • 谢谢格伦尼!这当然有帮助......! :)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-09-20
    • 1970-01-01
    • 1970-01-01
    • 2016-07-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多