【问题标题】:How to create an empty DataFrame with a specified schema?如何创建具有指定架构的空 DataFrame?
【发布时间】:2015-10-07 06:42:38
【问题描述】:

我想在 Scala 中使用指定架构在 DataFrame 上创建。我曾尝试使用 JSON 读取(我的意思是读取空文件),但我认为这不是最佳做法。

【问题讨论】:

    标签: scala apache-spark dataframe apache-spark-sql


    【解决方案1】:

    假设您想要一个具有以下架构的数据框:

    root
     |-- k: string (nullable = true)
     |-- v: integer (nullable = false)
    

    您只需为数据框定义架构并使用空的RDD[Row]

    import org.apache.spark.sql.types.{
        StructType, StructField, StringType, IntegerType}
    import org.apache.spark.sql.Row
    
    val schema = StructType(
        StructField("k", StringType, true) ::
        StructField("v", IntegerType, false) :: Nil)
    
    // Spark < 2.0
    // sqlContext.createDataFrame(sc.emptyRDD[Row], schema) 
    spark.createDataFrame(sc.emptyRDD[Row], schema)
    

    PySpark 等效项几乎相同:

    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    
    schema = StructType([
        StructField("k", StringType(), True), StructField("v", IntegerType(), False)
    ])
    
    # or df = sc.parallelize([]).toDF(schema)
    
    # Spark < 2.0 
    # sqlContext.createDataFrame([], schema)
    df = spark.createDataFrame([], schema)
    

    使用带有 Product 类型的隐式编码器(仅限 Scala),例如 Tuple

    import spark.implicits._
    
    Seq.empty[(String, Int)].toDF("k", "v")
    

    或案例类:

    case class KV(k: String, v: Int)
    
    Seq.empty[KV].toDF
    

    spark.emptyDataset[KV].toDF
    

    【讨论】:

    • 这是最合适的答案 - 完整,如果您想快速重现现有数据集的架构,它也很有用。我不知道为什么它不被接受。
    • 如何使用 trait 而不是 case 类创建 df:stackoverflow.com/questions/64276952/…
    【解决方案2】:

    从 Spark 2.0.0 开始,您可以执行以下操作。

    案例分类

    让我们定义一个Person 案例类:

    scala> case class Person(id: Int, name: String)
    defined class Person
    

    导入sparkSparkSession 隐式Encoders

    scala> import spark.implicits._
    import spark.implicits._
    

    并使用 SparkSession 创建一个空的Dataset[Person]

    scala> spark.emptyDataset[Person]
    res0: org.apache.spark.sql.Dataset[Person] = [id: int, name: string]
    

    架构 DSL

    您还可以使用模式“DSL”(请参阅​​org.apache.spark.sql.ColumnName 中的DataFrames 的支持函数)。

    scala> val id = $"id".int
    id: org.apache.spark.sql.types.StructField = StructField(id,IntegerType,true)
    
    scala> val name = $"name".string
    name: org.apache.spark.sql.types.StructField = StructField(name,StringType,true)
    
    scala> import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.types.StructType
    
    scala> val mySchema = StructType(id :: name :: Nil)
    mySchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true))
    
    scala> import org.apache.spark.sql.Row
    import org.apache.spark.sql.Row
    
    scala> val emptyDF = spark.createDataFrame(sc.emptyRDD[Row], mySchema)
    emptyDF: org.apache.spark.sql.DataFrame = [id: int, name: string]
    
    scala> emptyDF.printSchema
    root
     |-- id: integer (nullable = true)
     |-- name: string (nullable = true)
    

    【讨论】:

    • 嗨,编译器说我的模块上不存在spark.emptyDataset,如何使用它?有一些(正确的)类似于(不正确的)val df = apache.spark.emptyDataset[RawData]?
    • @PeterKrauss spark 是您使用 SparkSession.builder 创建的值,而不是 org.apache.spark 包的一部分。有两个 spark 名称正在使用中。这是开箱即用的spark spark-shell
    • 谢谢雅克。我更正了:SparkSession.builder 对象作为参数传递(似乎是最好的解决方案)从第一次常规初始化开始,现在正在运行。
    • 有没有办法使用特征而不是案例类来创建空数据框:stackoverflow.com/questions/64276952/…
    【解决方案3】:
    import scala.reflect.runtime.{universe => ru}
    def createEmptyDataFrame[T: ru.TypeTag] =
        hiveContext.createDataFrame(sc.emptyRDD[Row],
          ScalaReflection.schemaFor(ru.typeTag[T].tpe).dataType.asInstanceOf[StructType]
        )
      case class RawData(id: String, firstname: String, lastname: String, age: Int)
      val sourceDF = createEmptyDataFrame[RawData]
    

    【讨论】:

      【解决方案4】:

      这是在 pyspark 2.0.0 或更高版本中创建空数据框的解决方案。

      from pyspark.sql import SQLContext
      sc = spark.sparkContext
      schema = StructType([StructField('col1', StringType(),False),StructField('col2', IntegerType(), True)])
      sqlContext.createDataFrame(sc.emptyRDD(), schema)
      

      【讨论】:

        【解决方案5】:

        在这里,您可以使用 Scala 中的 StructType 创建架构并传递 Empty RDD,以便您能够创建空表。 以下代码是相同的。

        import org.apache.spark.SparkConf
        import org.apache.spark.SparkContext
        import org.apache.spark.sql._
        import org.apache.spark.sql.Row
        import org.apache.spark.sql.SparkSession
        import org.apache.spark.sql.types.StructType
        import org.apache.spark.sql.types.StructField
        import org.apache.spark.sql.types.IntegerType
        import org.apache.spark.sql.types.BooleanType
        import org.apache.spark.sql.types.LongType
        import org.apache.spark.sql.types.StringType
        
        
        
        //import org.apache.hadoop.hive.serde2.objectinspector.StructField
        
        object EmptyTable extends App {
          val conf = new SparkConf;
          val sc = new SparkContext(conf)
          //create sparksession object
          val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
        
          //Created schema for three columns 
           val schema = StructType(
            StructField("Emp_ID", LongType, true) ::
              StructField("Emp_Name", StringType, false) ::
              StructField("Emp_Salary", LongType, false) :: Nil)
        
              //Created Empty RDD 
        
          var dataRDD = sc.emptyRDD[Row]
        
          //pass rdd and schema to create dataframe
          val newDFSchema = sparkSession.createDataFrame(dataRDD, schema)
        
          newDFSchema.createOrReplaceTempView("tempSchema")
        
          sparkSession.sql("create table Finaltable AS select * from tempSchema")
        
        }
        

        【讨论】:

          【解决方案6】:

          创建空DataSet的Java版本:

          public Dataset<Row> emptyDataSet(){
          
              SparkSession spark = SparkSession.builder().appName("Simple Application")
                          .config("spark.master", "local").getOrCreate();
          
              Dataset<Row> emptyDataSet = spark.createDataFrame(new ArrayList<>(), getSchema());
          
              return emptyDataSet;
          }
          
          public StructType getSchema() {
          
              String schemaString = "column1 column2 column3 column4 column5";
          
              List<StructField> fields = new ArrayList<>();
          
              StructField indexField = DataTypes.createStructField("column0", DataTypes.LongType, true);
              fields.add(indexField);
          
              for (String fieldName : schemaString.split(" ")) {
                  StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
                  fields.add(field);
              }
          
              StructType schema = DataTypes.createStructType(fields);
          
              return schema;
          }
          

          【讨论】:

            【解决方案7】:

            从 Spark 2.4.3 开始

            val df = SparkSession.builder().getOrCreate().emptyDataFrame
            

            【讨论】:

            • 这并不能解决问题的架构部分。
            【解决方案8】:

            这有助于测试目的。

            Seq.empty[String].toDF()
            

            【讨论】:

            【解决方案9】:

            我有一个特殊要求,其中我已经有一个数据框,但在特定条件下我必须返回一个空数据框,所以我返回了df.limit(0)

            【讨论】:

              猜你喜欢
              • 2017-01-20
              • 1970-01-01
              • 2020-09-23
              • 1970-01-01
              • 1970-01-01
              • 2016-12-18
              • 1970-01-01
              相关资源
              最近更新 更多