【发布时间】:2019-05-25 15:35:35
【问题描述】:
我的目标是创建一个包含 4 个维度和 1 个度量的立方体。
这意味着我总共需要计算 16 个 GroupBy。
在我的代码中,您可以看到 4 个维度(性别、年龄、TotalChildren、ProductCategoryName)和 Measure TotalCost。
我已经过滤了我的所有列以删除它为空的任何行。
之后,我逐个计算每个 GroupBy,然后使用 coalesce() 将 csv 绑定到一个文件中。
整个过程大约需要 10 分钟,我认为这太多了。
有什么方法可以增强这个过程吗?也许通过计算其他人的一些 groupby ?
另外我的数据大约是 5GB,所以如果我读取它作为 groupby 的数量的 16 次,这意味着总共 80GB。
这是我的代码
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
object ComputeCube {
def main(args:Array[String]):Unit= {
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName("SparkProject2018")
.getOrCreate()
import spark.implicits._
val filePath="src/main/resources/dataspark.txt"
val df = spark.read.options(Map("inferSchema"->"true","delimiter"->"|","header"->"true"))
.csv(filePath).select("Gender", "BirthDate", "TotalCost", "TotalChildren", "ProductCategoryName")
val df2 = df
.filter("Gender is not null")
.filter("BirthDate is not null")
.filter("TotalChildren is not null")
.filter("ProductCategoryName is not null")
val currentDate = udf{ (dob: java.sql.Date) =>
import java.time.{LocalDate, Period}
Period.between(dob.toLocalDate, LocalDate.now).getYears
}
val df3 = df2.withColumn("Age", currentDate($"BirthDate"))
val groupByAll = df3.groupBy("Gender","Age", "TotalChildren", "ProductCategoryName" ).avg("TotalCost")
val groupByGenderAndAgeAndTotalChildren = df3.groupBy("Gender","Age", "TotalChildren").avg("TotalCost")
val groupByGenderAndAgeAndProductCategoryName = df3.groupBy("Gender","Age", "ProductCategoryName" ).avg("TotalCost")
val groupByGenderAndTotalChildrenAndProductCategoryName = df3.groupBy("Gender", "TotalChildren", "ProductCategoryName" ).avg("TotalCost")
val groupByAgeAndTotalChildrenAndProductCategoryName = df3.groupBy("Age", "TotalChildren", "ProductCategoryName" ).avg("TotalCost")
val groupByGenderAndAge = df3.groupBy("Gender","Age").avg("TotalCost")
val groupByGenderAndTotalChildren = df3.groupBy("Gender","TotalChildren").avg("TotalCost")
val groupByGenderAndProductCategoryName = df3.groupBy("Gender","ProductCategoryName" ).avg("TotalCost")
val groupByAgeAndTotalChildren = df3.groupBy("Age","TotalChildren").avg("TotalCost")
val groupByAgeAndProductCategoryName = df3.groupBy("Age","ProductCategoryName" ).avg("TotalCost")
val groupByTotalChildrenAndProductCategoryName = df3.groupBy("TotalChildren","ProductCategoryName" ).avg("TotalCost")
val groupByGender = df3.groupBy("Gender").avg("TotalCost")
val groupByAge = df3.groupBy("Age").avg("TotalCost")
val groupByTotalChildren = df3.groupBy("TotalChildren" ).avg("TotalCost")
val groupByProductCategoryName = df3.groupBy("ProductCategoryName" ).avg("TotalCost")
val groupByNone = df3.groupBy().avg("TotalCost")
groupByAll.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/All.csv")
groupByGenderAndAgeAndTotalChildren.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Gender_Age_TotalChildren.csv")
groupByGenderAndAgeAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Gender_Age_ProductCategoryName.csv")
groupByGenderAndTotalChildrenAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Gender_TotalChildren_ProductCategoryName.csv")
groupByAgeAndTotalChildrenAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Age_TotalChildren_ProductCategoryName.csv")
groupByGenderAndAge.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Gender_Age.csv")
groupByGenderAndTotalChildren.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Gender_TotalChildren.csv")
groupByGenderAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Gender_ProductCategoryName.csv")
groupByAgeAndTotalChildren.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Age_TotalChildren.csv")
groupByAgeAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Age_ProductCategoryName.csv")
groupByTotalChildrenAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/TotalChildren_ProductCategoryName.csv")
groupByGender.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Gender.csv")
groupByAge.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Age.csv")
groupByTotalChildren.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/TotalChildren.csv")
groupByProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/ProductCategoryName.csv")
groupByNone.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/None.csv")
}
}
【问题讨论】:
标签: scala apache-spark intellij-idea apache-spark-sql