【问题标题】:Reshape Spark DataFrame from Long to Wide On Large Data Sets在大型数据集上重塑 Spark DataFrame 从长到宽
【发布时间】:2015-11-05 21:23:33
【问题描述】:

我正在尝试使用 Spark DataFrame API 将我的数据框从长到宽重塑。数据集是学生提问的问题和答案的集合。这是一个庞大的数据集,Q(Question) 和 A(Answer) 大约在 1 到 50000 之间。我想收集所有可能的 Q*A 对并使用它们来构建列。如果学生对问题 1 的回答为 1,我们将值 1 分配给第 1_1 列。否则,我们给它一个 0。数据集已经在 S_ID、Q、A 上进行了重复数据删除。

在 R 中,我可以在库 reshape2 中简单地使用 dcast,但我不知道如何使用 Spark。我在下面的链接中找到了旋转的解决方案,但它需要固定数量的不同 Q*A 对。 http://rajasoftware.net/index.php/database/91446/scala-apache-spark-pivot-dataframes-pivot-spark-dataframe

我还尝试使用用户定义的函数连接 Q 和 A 并且它们应用交叉表但是,即使到目前为止我只在示例数据文件上测试我的代码,我仍然从控制台收到以下错误 -

The maximum limit of le6 pairs have been collected, which may not be all of the pairs.  
Please try reducing the amount of distinct items in your columns.

原始数据:

S_ID、Q、A
1, 1, 1
1、2、2
1、3、3
2、1、1
2、2、3
2、3、4
2、4、5

=> 长宽变换后:

S_ID、QA_1_1、QA_2_2、QA_3_3、QA_2_3、QA_3_4、QA_4_5
1, 1, 1, 1, 0, 0, 0
2, 1, 0, 0, 1, 1, 1

R code.  
library(dplyr); library(reshape2);  
df1 <- df %>% group_by(S_ID, Q, A) %>% filter(row_number()==1) %>% mutate(temp=1)  
df1 %>% dcast(S_ID ~ Q + A, value.var="temp", fill=0)  

Spark code.
val fnConcatenate = udf((x: String, y: String) => {"QA_"+ x +"_" + y})
df1 = df.distinct.withColumn("QA", fnConcatenate($"Q", $"A"))
df2 = stat.crosstab("S_ID", "QA")

任何想法将不胜感激。

【问题讨论】:

  • Q(Question) 和 A(Answer) 大约在 1 到 50000 之间 - 这是否意味着您有 50000 或 50000^2 个 QA 对/列?
  • @zero323 Q 和 A 都有数万个值。对于这对,我相信我们的数量不到十倍。

标签: r scala apache-spark apache-spark-sql


【解决方案1】:

您在此处尝试执行的操作在设计上有缺陷,原因有两个:

  1. 您将稀疏数据集替换为密集数据集。它在内存需求和计算方面都很昂贵,而且当您拥有大型数据集时,这几乎不是一个好主意
  2. 您限制了本地处理数据的能力。稍微简化一下 Spark 数据帧只是 RDD[Row] 的包装。这意味着行越大,您在单个分区上放置的空间就越少,因此聚合等操作的成本要高得多,并且需要更多的网络流量。

如果您拥有适当的列式存储,并且可以实现高效压缩或聚合等功能,那么宽表非常有用。从实际的角度来看,几乎所有你可以用宽表做的事情都可以用一个长表来完成,使用组/窗口函数。

您可以尝试的一件事是使用稀疏向量来创建类似宽的格式:

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.max
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.StringIndexer
import sqlContext.implicits._

df.registerTempTable("df")
val dfComb = sqlContext.sql("SELECT s_id, CONCAT(Q, '\t', A) AS qa FROM df")

val indexer = new StringIndexer()
  .setInputCol("qa")
  .setOutputCol("idx")
  .fit(dfComb)

val indexed = indexer.transform(dfComb)

val n = indexed.agg(max("idx")).first.getDouble(0).toInt + 1

val wideLikeDF = indexed
  .select($"s_id", $"idx")
  .rdd
  .map{case Row(s_id: String, idx: Double) => (s_id, idx.toInt)}
  .groupByKey // This assumes no duplicates
  .mapValues(vals => Vectors.sparse(n, vals.map((_, 1.0)).toArray))
  .toDF("id", "qaVec")

这里很酷的部分是您可以轻松地将其转换为IndexedRowMatrix,例如计算 SVD

val mat = new IndexedRowMatrix(wideLikeDF.map{
  // Here we assume that s_id can be mapped directly to Long
  // If not it has to be indexed
  case Row(id: String, qaVec: SparseVector) => IndexedRow(id.toLong, qaVec)
})

val svd = mat.computeSVD(3)

或到RowMatrix 并获取列统计信息或计算主成分:

val colStats = mat.toRowMatrix.computeColumnSummaryStatistic
val colSims = mat.toRowMatrix.columnSimilarities
val pc = mat.toRowMatrix.computePrincipalComponents(3)

编辑

在 Spark 1.6.0+ 中,您可以使用 pivot 函数。

【讨论】:

  • "在 Spark 1.6.0+ 中,您可以使用数据透视函数。"实际上,由于列数限制为 10K,这对大型数据集没有帮助。 Databricks 的开发人员还表示,更改默认限制可能不安全,这暗示了在编写时对枢轴容量的算法限制。玩具示例是一个推荐系统,它很好,但远远达不到标准,因为某些推荐器肯定需要处理超过 10K 的项目。例如,我需要 10 万个项目才能开始在我的推荐器中发挥作用。
猜你喜欢
  • 1970-01-01
  • 2018-06-15
  • 1970-01-01
  • 2018-05-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多