【问题标题】:Scala - Spark Boost GroupBy Computing for multiple DimensionsScala - Spark Boost GroupBy 多维度计算
【发布时间】:2019-05-25 15:35:35
【问题描述】:

我的目标是创建一个包含 4 个维度和 1 个度量的立方体。

这意味着我总共需要计算 16 个 GroupBy。

在我的代码中,您可以看到 4 个维度(性别、年龄、TotalChildren、ProductCategoryName)和 Measure TotalCost。

我已经过滤了我的所有列以删除它为空的任何行。

之后,我逐个计算每个 GroupBy,然后使用 coalesce() 将 csv 绑定到一个文件中。

整个过程大约需要 10 分钟,我认为这太多了。

有什么方法可以增强这个过程吗?也许通过计算其他人的一些 groupby ?

另外我的数据大约是 5GB,所以如果我读取它作为 groupby 的数量的 16 次,这意味着总共 80GB。


这是我的代码

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf


object ComputeCube {

def main(args:Array[String]):Unit= {

val spark: SparkSession = SparkSession.builder()
  .master("local[*]")
  .appName("SparkProject2018")
  .getOrCreate()

import spark.implicits._

val filePath="src/main/resources/dataspark.txt"

val df = spark.read.options(Map("inferSchema"->"true","delimiter"->"|","header"->"true"))
  .csv(filePath).select("Gender", "BirthDate", "TotalCost", "TotalChildren", "ProductCategoryName")

val df2 = df
  .filter("Gender is not null")
  .filter("BirthDate is not null")
  .filter("TotalChildren is not null")
  .filter("ProductCategoryName is not null")

val currentDate = udf{ (dob: java.sql.Date) =>
  import java.time.{LocalDate, Period}
  Period.between(dob.toLocalDate, LocalDate.now).getYears
}

val df3 = df2.withColumn("Age", currentDate($"BirthDate"))


val groupByAll = df3.groupBy("Gender","Age", "TotalChildren", "ProductCategoryName" ).avg("TotalCost")

val groupByGenderAndAgeAndTotalChildren = df3.groupBy("Gender","Age", "TotalChildren").avg("TotalCost")

val groupByGenderAndAgeAndProductCategoryName = df3.groupBy("Gender","Age", "ProductCategoryName" ).avg("TotalCost")

val groupByGenderAndTotalChildrenAndProductCategoryName = df3.groupBy("Gender", "TotalChildren", "ProductCategoryName" ).avg("TotalCost")

val groupByAgeAndTotalChildrenAndProductCategoryName = df3.groupBy("Age", "TotalChildren", "ProductCategoryName" ).avg("TotalCost")

val groupByGenderAndAge = df3.groupBy("Gender","Age").avg("TotalCost")

val groupByGenderAndTotalChildren = df3.groupBy("Gender","TotalChildren").avg("TotalCost")

val groupByGenderAndProductCategoryName = df3.groupBy("Gender","ProductCategoryName" ).avg("TotalCost")

val groupByAgeAndTotalChildren = df3.groupBy("Age","TotalChildren").avg("TotalCost")

val groupByAgeAndProductCategoryName = df3.groupBy("Age","ProductCategoryName" ).avg("TotalCost")

val groupByTotalChildrenAndProductCategoryName = df3.groupBy("TotalChildren","ProductCategoryName" ).avg("TotalCost")

val groupByGender = df3.groupBy("Gender").avg("TotalCost")

val groupByAge = df3.groupBy("Age").avg("TotalCost")

val groupByTotalChildren = df3.groupBy("TotalChildren" ).avg("TotalCost")

val groupByProductCategoryName = df3.groupBy("ProductCategoryName" ).avg("TotalCost")

val groupByNone = df3.groupBy().avg("TotalCost")


groupByAll.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
  .mode("overwrite").save("src/main/resources/All.csv")

groupByGenderAndAgeAndTotalChildren.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
  .mode("overwrite").save("src/main/resources/Gender_Age_TotalChildren.csv")

groupByGenderAndAgeAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
  .mode("overwrite").save("src/main/resources/Gender_Age_ProductCategoryName.csv")

groupByGenderAndTotalChildrenAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
  .mode("overwrite").save("src/main/resources/Gender_TotalChildren_ProductCategoryName.csv")

groupByAgeAndTotalChildrenAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
  .mode("overwrite").save("src/main/resources/Age_TotalChildren_ProductCategoryName.csv")

groupByGenderAndAge.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
  .mode("overwrite").save("src/main/resources/Gender_Age.csv")

groupByGenderAndTotalChildren.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
  .mode("overwrite").save("src/main/resources/Gender_TotalChildren.csv")

groupByGenderAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
  .mode("overwrite").save("src/main/resources/Gender_ProductCategoryName.csv")

groupByAgeAndTotalChildren.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
  .mode("overwrite").save("src/main/resources/Age_TotalChildren.csv")

groupByAgeAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
  .mode("overwrite").save("src/main/resources/Age_ProductCategoryName.csv")

groupByTotalChildrenAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
  .mode("overwrite").save("src/main/resources/TotalChildren_ProductCategoryName.csv")

groupByGender.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
  .mode("overwrite").save("src/main/resources/Gender.csv")

groupByAge.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
  .mode("overwrite").save("src/main/resources/Age.csv")

groupByTotalChildren.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
  .mode("overwrite").save("src/main/resources/TotalChildren.csv")

groupByProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
  .mode("overwrite").save("src/main/resources/ProductCategoryName.csv")

groupByNone.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
  .mode("overwrite").save("src/main/resources/None.csv")

  }
 }

【问题讨论】:

    标签: scala apache-spark intellij-idea apache-spark-sql


    【解决方案1】:
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    
    object Test1 {
      case class SalesValue(Gender:String, BirthDate:String, TotalChildren:Int, ProductCategoryName:String, TotalCost: Int)
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
        import spark.implicits._
        val seq = Seq(("boy", "1990-01-01", 2, "milk", 200), ("girl", "1990-01-01", 2, "milk", 250), ("boy", "1990-01-01", 2, "milk", 270))
    
        val ds = spark.sparkContext.parallelize(seq).map(r => SalesValue(r._1, r._2, r._3, r._4, r._5)).toDS()
        val agg = ds.groupBy("Gender", "BirthDate", "TotalChildren", "ProductCategoryName").agg(sum($"TotalCost").alias("sum"), count($"Gender").alias("count"))
        agg.cache()
        val result1 = agg.groupBy("Gender").agg(sum($"sum")/sum($"count")).show()
      }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-05-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-11-06
      相关资源
      最近更新 更多