【问题标题】:How to create a DataFrame from a text file in Spark如何从 Spark 中的文本文件创建 DataFrame
【发布时间】:2016-08-14 11:04:10
【问题描述】:

我在 HDFS 上有一个文本文件,我想将其转换为 Spark 中的数据帧。

我正在使用 Spark 上下文加载文件,然后尝试从该文件生成单个列。

val myFile = sc.textFile("file.txt")
val myFile1 = myFile.map(x=>x.split(";"))

这样做之后,我正在尝试以下操作。

myFile1.toDF()

我遇到了一个问题,因为 myFile1 RDD 中的元素现在是数组类型。

我该如何解决这个问题?

【问题讨论】:

  • 我建议您编辑问题的标题,以便更准确地表示问题。
  • 你能帮我取一个恰当的标题吗?我尽量保持简单。

标签: scala apache-spark dataframe apache-spark-sql rdd


【解决方案1】:

带有 PIPE (|) 分隔文件的 txt 文件可以读取为:


df = spark.read.option("sep", "|").option("header", "true").csv("s3://bucket_name/folder_path/file_name.txt")

【讨论】:

    【解决方案2】:

    在使用隐式转换之前,您将无法将其转换为数据框。

    val sqlContext = new SqlContext(new SparkContext())
    
    import sqlContext.implicits._
    

    只有在此之后,您才能将其转换为数据框

    case class Test(id:String,filed2:String)
    
    val myFile = sc.textFile("file.txt")
    
    val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()
    

    【讨论】:

    【解决方案3】:

    您可以读取文件以获得 RDD,然后为其分配架构。创建模式的两种常见方法是使用案例类或模式对象[我的首选]。遵循您可能使用的代码的快速 sn-ps。

    案例类方法

    case class Test(id:String,name:String)
    val myFile = sc.textFile("file.txt")
    val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()
    

    架构方法

    import org.apache.spark.sql.types._
    val schemaString = "id name"
    val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable=true))
    val schema = StructType(fields)
    
    val dfWithSchema = sparkSess.read.option("header","false").schema(schema).csv("file.txt")
    dfWithSchema.show()
    

    第二种方法是我的首选方法,因为案例类有最多 22 个字段的限制,如果您的文件有超过 22 个字段,这将是一个问题!

    【讨论】:

      【解决方案4】:
      val df = spark.read.textFile("abc.txt")
      
      case class Abc (amount:Int, types: String, id:Int)  //columns and data types
      
      val df2 = df.map(rec=>Amount(rec(0).toInt, rec(1), rec(2).toInt))
      rdd2.printSchema
      

      root
       |-- amount: integer (nullable = true)
       |-- types: string (nullable = true)
       |-- id: integer (nullable = true)
      

      【讨论】:

        【解决方案5】:

        我给出了从文本文件创建 DataFrame 的不同方法

        val conf = new SparkConf().setAppName(appName).setMaster("local")
        val sc = SparkContext(conf)
        

        原始文本文件

        val file = sc.textFile("C:\\vikas\\spark\\Interview\\text.txt")
        val fileToDf = file.map(_.split(",")).map{case Array(a,b,c) => 
        (a,b.toInt,c)}.toDF("name","age","city")
        fileToDf.foreach(println(_))
        

        没有架构的 Spark 会话

        import org.apache.spark.sql.SparkSession
        val sparkSess = 
        SparkSession.builder().appName("SparkSessionZipsExample")
        .config(conf).getOrCreate()
        
        val df = sparkSess.read.option("header", 
        "false").csv("C:\\vikas\\spark\\Interview\\text.txt")
        df.show()
        

        使用架构激发会话

        import org.apache.spark.sql.types._
        val schemaString = "name age city"
        val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, 
        StringType, nullable=true))
        val schema = StructType(fields)
        
        val dfWithSchema = sparkSess.read.option("header", 
        "false").schema(schema).csv("C:\\vikas\\spark\\Interview\\text.txt")
        dfWithSchema.show()
        

        使用 sql 上下文

        import org.apache.spark.sql.SQLContext
        
        val fileRdd = 
        sc.textFile("C:\\vikas\\spark\\Interview\\text.txt").map(_.split(",")).map{x 
        => org.apache.spark.sql.Row(x:_*)}
        val sqlDf = sqlCtx.createDataFrame(fileRdd,schema)
        sqlDf.show()
        

        【讨论】:

          【解决方案6】:

          更新 - 从 Spark 1.6 开始,您可以简单地使用内置的 csv 数据源:

          spark: SparkSession = // create the Spark Session
          val df = spark.read.csv("file.txt")
          

          您还可以使用各种选项来控制 CSV 解析,例如:

          val df = spark.read.option("header", "false").csv("file.txt")
          

          对于 Spark 版本 : 最简单的方法是使用spark-csv - 将其包含在您的依赖项中并遵循自述文件,它允许设置自定义分隔符(;),可以读取 CSV 标头(如果有的话),并且可以推断架构 types(需要额外扫描数据)。

          或者,如果您知道架构,您可以创建一个表示它的案例类,并将您的 RDD 元素映射到此类的实例,然后再转换为 DataFrame,例如:

          case class Record(id: Int, name: String)
          
          val myFile1 = myFile.map(x=>x.split(";")).map {
            case Array(id, name) => Record(id.toInt, name)
          } 
          
          myFile1.toDF() // DataFrame will have columns "id" and "name"
          

          【讨论】:

            【解决方案7】:

            我知道我回答这个问题已经很晚了,但我想出了一个不同的答案:

            val rdd = sc.textFile("/home/training/mydata/file.txt")
            
            val text = rdd.map(lines=lines.split(",")).map(arrays=>(ararys(0),arrays(1))).toDF("id","name").show 
            

            【讨论】:

              【解决方案8】:

              如果要使用toDF 方法,则必须将Array[String]RDD 转换为case 类的RDD。例如,你必须这样做:

              case class Test(id:String,filed2:String)
              val myFile = sc.textFile("file.txt")
              val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()
              

              【讨论】:

              • 感谢您的回答马克。它应该有一个绿色的勾号,但 Tzach 在几分之一秒之前就有了类似的答案,我最终接受了他的解决方案。 +1 为您提供帮助。
              猜你喜欢
              • 2019-01-20
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2019-07-08
              • 1970-01-01
              • 1970-01-01
              • 2019-12-02
              • 2021-08-19
              相关资源
              最近更新 更多