【问题标题】:Spark DF: Schema for type Unit is not supportedSpark DF:不支持 Unit 类型的架构
【发布时间】:2017-03-23 12:05:02
【问题描述】:

我是 Scala 和 Spark 的新手,并尝试在我找到的一些示例上进行构建。本质上,我正在尝试从数据框中调用一个函数,以使用 Google API 从邮政编码中获取状态。 我有代码单独工作但不能一起工作;( 这是一段代码不起作用...

Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type Unit is not supported
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:716)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:654)
    at org.apache.spark.sql.functions$.udf(functions.scala:2837)
    at MovieRatings$.getstate(MovieRatings.scala:51)
    at MovieRatings$$anonfun$4.apply(MovieRatings.scala:48)
    at MovieRatings$$anonfun$4.apply(MovieRatings.scala:47)...
Line 51 starts with def getstate = udf {(zipcode:String)...
...

代码:

 userDF.createOrReplaceTempView("Users")
    // SQL statements can be run by using the sql methods provided by Spark
   val zipcodesDF = spark.sql("SELECT distinct zipcode, zipcode as state FROM Users")
 //  zipcodesDF.map(zipcodes => "zipcode: " + zipcodes.getAs[String]("zipcode") + getstate(zipcodes.getAs[String]("zipcode"))).show()
  val colNames = zipcodesDF.columns
val cols = colNames.map(cName => zipcodesDF.col(cName))
val theColumn = zipcodesDF("state")
val mappedCols = cols.map(c => 
  if (c.toString() == theColumn.toString()) getstate(c).as("transformed") else c)
  val newDF = zipcodesDF.select(mappedCols:_*).show()
  }
 def getstate = udf {(zipcode:String) => {
val url = "http://maps.googleapis.com/maps/api/geocode/json?address="+zipcode
val result = scala.io.Source.fromURL(url).mkString
val address = parse(result) 
val shortnames = for {
         JObject(address_components) <- address
         JField("short_name", short_name)  <- address_components
          } yield short_name
val state = shortnames(3)
//return state.toString()
val stater = state.toString()

}
  }

【问题讨论】:

  • 您的 UDF 没有返回任何内容,因为您注释掉了 return state.toString() 部分。

标签: scala function apache-spark dataframe


【解决方案1】:

感谢您的回复.. 我想我想通了。这是有效的代码。需要注意的一件事是 Google API 有限制,所以一些有效的邮政编码没有状态信息。但对我来说不是问题。

            private def loaduserdata(spark: SparkSession): Unit = {
          import spark.implicits._
           // Create an RDD of User objects from a text file, convert it to a Dataframe
          val userDF = spark.sparkContext
            .textFile("examples/src/main/resources/users.csv")
            .map(_.split("::"))
            .map(attributes => users(attributes(0).trim.toInt,  attributes(1), attributes(2).trim.toInt, attributes(3), attributes(4)))
            .toDF()
          // Register the DataFrame as a temporary view
          userDF.createOrReplaceTempView("Users")
          // SQL statements can be run by using the sql methods provided by Spark
         val zipcodesDF = spark.sql("SELECT distinct zipcode, substr(zipcode,1,5) as state FROM Users ORDER BY zipcode desc") //  zipcodesDF.map(zipcodes => "zipcode: " + zipcodes.getAs[String]("zipcode") + getstate(zipcodes.getAs[String]("zipcode"))).show()
        val colNames = zipcodesDF.columns
      val cols = colNames.map(cName => zipcodesDF.col(cName))
      val theColumn = zipcodesDF("state")
      val mappedCols = cols.map(c => 
        if (c.toString() == theColumn.toString()) getstate(c).as("state") else c)
        val geoDF = zipcodesDF.select(mappedCols:_*)//.show()
      geoDF.createOrReplaceTempView("Geo")
        }
       val getstate = udf {(zipcode: String) => 
           val url = "http://maps.googleapis.com/maps/api/geocode/json?address="+zipcode
           val result = scala.io.Source.fromURL(url).mkString
           val address = parse(result) 
          val statenm = for {
                      JObject(statename) <- address
                      JField("types", JArray(types)) <- statename
                      JField("short_name", JString(short_name)) <- statename
                     if types.toString().equals("List(JString(administrative_area_level_1), JString(political))")
                     //  if types.head.equals("JString(administrative_area_level_1)")
                    } yield short_name
           val str = if (statenm.isEmpty.toString().equals("true")) "N/A" else statenm.head        
           }  

【讨论】:

猜你喜欢
  • 2018-02-26
  • 2021-05-12
  • 2021-11-21
  • 1970-01-01
  • 1970-01-01
  • 2017-06-26
  • 1970-01-01
  • 2018-07-18
  • 1970-01-01
相关资源
最近更新 更多