【问题标题】:Not able to create parquet files in hdfs using spark shell无法使用 spark shell 在 hdfs 中创建镶木地板文件
【发布时间】:2016-10-07 18:47:53
【问题描述】:

我想在 hdfs 中创建 parquet 文件,然后通过 hive 作为外部表读取它。在编写 parquet 文件时,我对 spark-shell 中的阶段失败感到震惊。

Spark 版本:1.5.2 斯卡拉版本:2.10.4 Java:1.7

输入文件:(employee.txt)

1201,satish,25
1202,克里希纳,28
第1203章 39
1204,javed,23
第1205章 23

在 Spark-Shell 中:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val employee = sc.textFile("employee.txt")
employee.first()
val schemaString = "id name age"
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType, StructField, StringType};
val schema = StructType(schemaString.split(" ").map(fieldName ⇒ StructField(fieldName, StringType, true)))
val rowRDD = employee.map(_.split(",")).map(e ⇒ Row(e(0).trim.toInt, e(1), e(2).trim.toInt))
val employeeDF = sqlContext.createDataFrame(rowRDD, schema)
val finalDF = employeeDF.toDF();
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
var WriteParquet= finalDF.write.parquet("/user/myname/schemaParquet")

当我输入我得到的最后一个命令时,

ERROR

SPARK APPLICATION MANAGER

我什至尝试增加执行器内存,它仍然失败。 同样重要的是, finalDF.show() 产生了同样的错误。 所以,我相信我在这里犯了一个逻辑错误。

感谢支持

【问题讨论】:

    标签: scala hadoop apache-spark parquet


    【解决方案1】:

    这里的问题是您正在创建一个所有字段/列类型默认为 StringType 的架构。但是在传递架构中的值时,IdAge 的值正在根据代码转换为 Integer。因此,在运行时抛出 Matcherror。

    架构中列的数据类型应与传递给它的值的数据类型相匹配。试试下面的代码。

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
    val employee = sc.textFile("employee.txt")
    employee.first()
    //val schemaString = "id name age"
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.types._;
    val schema = StructType(StructField("id", IntegerType, true) :: StructField("name", StringType, true) :: StructField("age", IntegerType, true) :: Nil)
    val rowRDD = employee.map(_.split(" ")).map(e ⇒ Row(e(0).trim.toInt, e(1), e(2).trim.toInt))
    val employeeDF = sqlContext.createDataFrame(rowRDD, schema)
    val finalDF = employeeDF.toDF();
    sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
    var WriteParquet= finalDF.write.parquet("/user/myname/schemaParquet")
    

    这段代码应该可以正常运行。

    【讨论】:

    • 感谢您的回复 Nitin,
    • 完美运行,这里有个小错误,应该是 //val rowRDD = employee.map(_.split(",")).map(e ⇒ Row(e(0 ).trim.toInt, e(1), e(2).trim.toInt)) // 你错过了一个逗号,但非常感谢你的帮助。
    • 不客气。实际上,我在测试时正在处理一个由空格分隔的文本文件。因此,错过了逗号。感谢您指出:)
    • 另外,它报告值拆分不是我的类包的成员,并且当我在 IntelliJ IDE 中尝试整个过程时,它报告应用程序不采用任何参数。你知道我需要添加什么依赖吗?
    • 你使用的是 sbt 还是 Maven?我能够使用 Maven 在 Eclipse 中运行相同的代码。将以下依赖项添加到 pom.xml ` org.apache.sparkspark-core_2.111.6.1org.apache.sparkspark-sql_2.111.6.1`你能粘贴错误信息和sbt/pom. xml 文件?
    猜你喜欢
    • 2021-10-14
    • 2017-03-17
    • 2019-05-05
    • 2016-07-07
    • 1970-01-01
    • 2021-02-18
    • 2020-08-15
    • 1970-01-01
    • 2018-05-06
    相关资源
    最近更新 更多