【问题标题】:How to create an empty dataFrame in Spark如何在 Spark 中创建一个空的 dataFrame
【发布时间】:2018-05-30 13:53:46
【问题描述】:

我有一组基于 Avro 的配置单元表,我需要从中读取数据。由于 Spark-SQL 使用 hive serdes 从 HDFS 读取数据,它比直接读取 HDFS 慢得多。所以我使用数据块 Spark-Avro jar 从底层 HDFS 目录读取 Avro 文件。

一切正常,除非表是空的。我已设法使用以下命令从 hive 表的 .avsc 文件中获取架构,但出现错误“未找到 Avro 文件

val schemaFile = FileSystem.get(sc.hadoopConfiguration).open(new Path("hdfs://myfile.avsc"));

val schema = new Schema.Parser().parse(schemaFile);

spark.read.format("com.databricks.spark.avro").option("avroSchema", schema.toString).load("/tmp/myoutput.avro").show()

解决方法:

我在该目录中放置了一个空文件,并且同样的工作正常。

还有其他方法可以达到同样的效果吗?像conf设置什么的?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql avro spark-avro


    【解决方案1】:

    您不需要使用 emptyRDD。 PySpark 2.4 对我有用:

    empty_df = spark.createDataFrame([], schema) # spark is the Spark Session
    

    如果您已经有来自另一个数据框的架构,您可以这样做:

    schema = some_other_df.schema
    

    如果不这样做,则手动创建空数据框的架构,例如:

    schema = StructType([StructField("col_1", StringType(), True),
                         StructField("col_2", DateType(), True),
                         StructField("col_3", StringType(), True),
                         StructField("col_4", IntegerType(), False)]
                         )
    

    我希望这会有所帮助。

    【讨论】:

    • 您可能应该添加需要导入的数据类型,例如from pyspark.sql.types import StructType, StructField 并且末尾的布尔值指示该列是否可以为空 spark.apache.org/docs/2.1.0/api/python/…
    【解决方案2】:

    类似于 EmiCareOfCell44 的回答,只是更优雅一点,更“空”

    val emptySchema = StructType(Seq())
    val emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],
                    emptySchema)
    

    【讨论】:

      【解决方案3】:

      创建一个空的DataFrame:

      val my_schema = StructType(Seq(
          StructField("field1", StringType, nullable = false),
          StructField("field2", StringType, nullable = false)
        ))
      
      val empty: DataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], my_schema)
      

      也许这会有所帮助

      【讨论】:

      • 感谢您的回答。我遇到了这种创建空df的方式,但在我的情况下架构是动态的
      【解决方案4】:

      根据您的 Spark 版本,您可以使用反射方式。SchemaConverters 中有一个私有方法可以将 Schema 转换为 StructType..(不知道为什么它是私有的,老实说,它在其他情况下会非常有用)。使用 scala 反射,您应该可以通过以下方式进行操作

      import scala.reflect.runtime.{universe => ru}
      import org.apache.avro.Schema
      import org.apache.spark.sql.Row
      import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
      
      var schemaStr = "{\n \"type\": \"record\",\n \"namespace\": \"com.example\",\n \"name\": \"FullName\",\n \"fields\": [\n { \"name\": \"first\", \"type\": \"string\" },\n      { \"name\": \"last\", \"type\": \"string\" }\n  ]\n }"
      val schema = new Schema.Parser().parse(schemaStr);
      
      val m = ru.runtimeMirror(getClass.getClassLoader)
      val module = m.staticModule("com.databricks.spark.avro.SchemaConverters")
      val im = m.reflectModule(module)
      val method = im.symbol.info.decl(ru.TermName("toSqlType")).asMethod
      
      val objMirror = m.reflect(im.instance)
      val structure = objMirror.reflectMethod(method)(schema).asInstanceOf[com.databricks.spark.avro.SchemaConverters.SchemaType]
      val sqlSchema = structure.dataType.asInstanceOf[StructType]
      val empty = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], sqlSchema)
      
      empty.printSchema
      

      【讨论】:

        猜你喜欢
        • 2017-12-13
        • 1970-01-01
        • 1970-01-01
        • 2016-05-26
        • 2021-06-12
        • 1970-01-01
        • 1970-01-01
        • 2016-04-10
        • 1970-01-01
        相关资源
        最近更新 更多