【问题标题】:Csv Data is not loading properly as Parquet using SparkCSV 数据未正确加载为 Parquet 使用 Spark
【发布时间】:2020-11-09 20:12:50
【问题描述】:

我在 Hive

中有一张桌子
CREATE TABLE tab_data (
  rec_id INT,
  rec_name STRING,
  rec_value DECIMAL(3,1),
  rec_created TIMESTAMP
) STORED AS PARQUET;

我想用 .csv 文件中的数据填充此表

10|customer1|10.0|2016-09-07  08:38:00.0
20|customer2|24.0|2016-09-08  10:45:00.0
30|customer3|35.0|2016-09-10  03:26:00.0
40|customer1|46.0|2016-09-11  08:38:00.0
50|customer2|55.0|2016-09-12  10:45:00.0
60|customer3|62.0|2016-09-13  03:26:00.0
70|customer1|72.0|2016-09-14  08:38:00.0
80|customer2|23.0|2016-09-15  10:45:00.0
90|customer3|30.0|2016-09-16  03:26:00.0

使用 SparkScala 代码如下

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.types.{DataTypes, IntegerType, StringType, StructField, StructType, TimestampType}

object MainApp {

  val spark = SparkSession
    .builder()
    .appName("MainApp")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","200") 
    .getOrCreate()

  val sc = spark.sparkContext

  val inputPath = "hdfs://host.hdfs:8020/..../tab_data.csv"
  val outputPath = "hdfs://host.hdfs:8020/...../warehouse/test.db/tab_data"

  def main(args: Array[String]): Unit = {

    try {

      val DecimalType = DataTypes.createDecimalType(3, 1)

      /**
        * schema
        */
      val schema = StructType(List(StructField("rec_id", IntegerType, true), StructField("rec_name",StringType, true),
        StructField("rec_value",DecimalType),StructField("rec_created",TimestampType, true)))

      /**
        * Reading the data from HDFS 
        */
      val data = spark
        .read
        .option("sep","|")
        .schema(schema)
        .csv(inputPath)

      data.show(truncate = false)
      data.schema.printTreeString()

      /**
        * Writing the data as Parquet
        */
      data
        .write
        .mode(SaveMode.Append)
        .parquet(outputPath)

    } finally {
      sc.stop()    
      spark.stop()
    }
  }
}

问题是我得到了这个输出

+------+--------+---------+-----------+
|rec_id|rec_name|rec_value|rec_created|
+------+--------+---------+-----------+
|null  |null    |null     |null       |
|null  |null    |null     |null       |
|null  |null    |null     |null       |
|null  |null    |null     |null       |
|null  |null    |null     |null       |
|null  |null    |null     |null       |
|null  |null    |null     |null       |
|null  |null    |null     |null       |
|null  |null    |null     |null       |
|null  |null    |null     |null       |
|null  |null    |null     |null       |
|null  |null    |null     |null       |
|null  |null    |null     |null       |


root
 |-- rec_id: integer (nullable = true)
 |-- rec_name: string (nullable = true)
 |-- rec_value: decimal(3,1) (nullable = true)
 |-- rec_created: timestamp (nullable = true)

架构很好,但数据未正确加载到表中

SELECT * FROM tab_data;

+------------------+--------------------+---------------------+-----------------------+--+
| tab_data.rec_id  | tab_data.rec_name  | tab_data.rec_value  | tab_data.rec_created  |
+------------------+--------------------+---------------------+-----------------------+--+
| NULL             | NULL               | NULL                | NULL                  |
| NULL             | NULL               | NULL                | NULL                  |
| NULL             | NULL               | NULL                | NULL                  |
| NULL             | NULL               | NULL                | NULL                  |
| NULL             | NULL               | NULL                | NULL                  |
| NULL             | NULL               | NULL                | NULL                  |
| NULL             | NULL               | NULL                | NULL                  |
| NULL             | NULL               | NULL                | NULL                  |
| NULL             | NULL               | NULL                | NULL                  |

我做错了什么?

我是 Spark 的新手,我们将不胜感激。

【问题讨论】:

  • 您可以直接使用 insertInto 而不是将数据写入 hdfs 位置..因为您的表是托管表
  • 嗨@Srinivas,感谢您的支持。是的!你是对的,它可以这样做,但对我来说,技术和业务规则是别人强加的:-)

标签: scala apache-spark hive parquet hdf


【解决方案1】:

要处理SparkHiveParquet 之间的问题,请按如下方式设置您的SparkSession

  val spark = SparkSession
    .builder()
    .appName("CsvToParquet")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","200") //Change to a more reasonable default number of partitions for our data
    .config("spark.sql.parquet.writeLegacyFormat", true) // To skip issues with data type between Spark and Hive
                                                         // The convention used by Spark to write Parquet data is configurable.
                                                         // This is determined by the property spark.sql.parquet.writeLegacyFormat
                                                         // The default value is false. If set to "true",
                                                         // Spark will use the same convention as Hive for writing the Parquet data.

之后读取.csv数据如下

      val data = spark
        .read
        .option("sep","|")
        .option("timestampFormat","yyyy-MM-dd HH:mm:ss.S") // to read timestamp fields
        .option("inferSchema",false) // by default is false
        .schema(schema)
        .csv(inputPath)

然后将数据写入parquetno compression(默认数据是压缩的)如下

      data
        .write
        .mode(SaveMode.Append)
        .option("compression", "none") // Assuming no data compression
        .parquet(outputPath)

注意:可能Hive无法查询数据的原因是数据默认压缩为snappy格式,而您的CREATE TABLE语句将数据存储为parquet没有压缩。

【讨论】:

  • 嗨@Chema,代码工作正常,我可以查询表格。谢谢。
【解决方案2】:

您在所有列中都获得了null 值,因为String 类型的列之一无法转换为Timestamp 类型。

要将字符串转换为时间戳类型,请在加载 csv 数据时使用此 option("timestampFormat","yyyy-MM-dd HH:mm:ss.S") 选项指定时间戳格式。

检查下面的代码。

架构

scala> val schema = StructType(List(
   StructField("rec_id", IntegerType, true), 
   StructField("rec_name",StringType, true),
   StructField("rec_value",DecimalType(3,1)),
   StructField("rec_created",TimestampType, true))
)

加载 CSV 数据

scala> val df = spark
.read
.option("sep","|")
.option("inferSchema","true")
.option("timestampFormat","yyyy-MM-dd HH:mm:ss.S")
.schema(schema)
.csv("/tmp/sample")

scala> df.show(false)
+------+---------+---------+-------------------+
|rec_id|rec_name |rec_value|rec_created        |
+------+---------+---------+-------------------+
|10    |customer1|10.0     |2016-09-07 08:38:00|
|20    |customer2|24.0     |2016-09-08 10:45:00|
|30    |customer3|35.0     |2016-09-10 03:26:00|
|40    |customer1|46.0     |2016-09-11 08:38:00|
|50    |customer2|55.0     |2016-09-12 10:45:00|
|60    |customer3|62.0     |2016-09-13 03:26:00|
|70    |customer1|72.0     |2016-09-14 08:38:00|
|80    |customer2|23.0     |2016-09-15 10:45:00|
|90    |customer3|30.0     |2016-09-16 03:26:00|
+------+---------+---------+-------------------+

更新

由于表是托管表,所以不需要设置所有这些参数,可以使用insertInto函数将数据插入到表中。

df.write.mode("append").insertInto("tab_data")

【讨论】:

  • 嗨@Srinivas,我应用了你的更改,数据框输出看起来不错,但我做不到SELECT * FROM tab_data; 我收到错误Error: java.io.IOException: parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file ......。感谢您的帮助。
  • 另一个问题是:我为什么要做.option("inferSchema","true")。我不想从数据中推断模式,我认为这有点低效,我自己编写了模式。感谢您的帮助。
  • 你可以删除这个 - option("inferSchema","true")..这不是强制性的..
  • 你能显示 tab_data 的创建表语句吗?并确保在插入此新数据之前..删除该表的所有行或文件..
  • 嗨@Srinivas,tab_data 的create table 声明在帖子的顶部。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-01-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多