【问题标题】:Convert RDD[String] to RDD[Row] to Dataframe Spark Scala将 RDD[String] 转换为 RDD[Row] 到 Dataframe Spark Scala
【发布时间】:2017-06-13 09:09:02
【问题描述】:

我正在读取一个有很多空格并且需要过滤掉空格的文件。之后我们需要将其转换为数据框。下面的示例输入。

2017123 ¦     ¦10¦running¦00000¦111¦-EXAMPLE

我对此的解决方案是以下函数,它解析所有空格并修剪文件。

def truncateRDD(fileName : String): RDD[String] = {
    val example = sc.textFile(fileName)
    example.map(lines => lines.replaceAll("""[\t\p{Zs}]+""", ""))
}

但是,我不确定如何将其放入数据框中。 sc.textFile 返回 RDD[String]。我尝试了案例类的方式,但问题是我们有 800 个字段模式,案例类不能超过 22。

我正在考虑以某种方式将 RDD[String] 转换为 RDD[Row],这样我就可以使用 createDataFrame 函数。

val DF = spark.createDataFrame(rowRDD, schema)

关于如何做到这一点的任何建议?

【问题讨论】:

    标签: scala hadoop apache-spark dataframe spark-dataframe


    【解决方案1】:

    在你的情况下简单的方法:

    val RowOfRDD = truncateRDD("yourfilename").map(r => Row.fromSeq(r))
    

    如果您使用的是 scala 2.10,如何解决productarity 问题?

    但是,我不确定如何将其放入数据框中。 sc.textFile 返回一个 RDD[字符串]。我尝试了案例类的方式,但问题是我们 有 800 个字段架构,案例类不能超过 22 个。

    是的,有一些限制,例如productarity,但我们可以克服... 您可以对

    准备一个 extends Product 并覆盖方法的案例类。

    喜欢……

    • productArity():Int: 这将返回属性的大小。在我们的例子中,它是 33。所以,我们的实现如下所示:

    • productElement(n:Int):Any: 给定一个索引,这将返回属性。作为保护,我们还有一个默认情况,它会抛出一个IndexOutOfBoundsException 异常:

    • canEqual (that:Any):Boolean:这是三个函数中的最后一个,当对类进行相等检查时,它用作边界条件:


    【讨论】:

    • scala 2.11 on wards arity 问题不存在。上述方法的以下版本的scala适用
    【解决方案2】:

    首先将您的字符串拆分/解析到字段中。

    rdd.map( line => parse(line)) 其中 parse 是一些解析函数。它可以像拆分一样简单,但您可能想要更强大的东西。这将为您提供RDD[Array[String]] 或类似名称。

    然后您可以使用rdd.map(a => Row.fromSeq(a)) 转换为RDD[Row]

    您可以从那里转换为 DataFrame wising sqlContext.createDataFrame(rdd, schema),其中 rdd 是您的 RDD[Row],schema 是您的架构 StructType。

    【讨论】:

    • 我有一个嵌套的 JSON 数组要解析,如何将其转换为数据框?
    猜你喜欢
    • 2021-09-28
    • 2018-03-05
    • 1970-01-01
    • 2018-10-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多