【问题标题】:Spark Sql Dataframe Join on one fieldSpark Sql Dataframe Join 一个字段
【发布时间】:2017-03-15 03:05:41
【问题描述】:

我对 Spark 很陌生。我有以下查询-->

我有 2 张桌子。业务和检查。 业务表有字段 -> Business_id, name, address 检查表有 --> 分数 我想计算前 10 名的分数。 所以,我需要根据 Business_id 申请加入。我尝试了 2 种方法,但都没有工作 --> 1)使用sqlContext.sql(我写了sql查询)

1)sqlContext.sql("""select CBusinesses.BUSINESS_ID,CBusinesses.name,  CBusinesses.address, CBusinesses.city, CBusinesses.postal_code, CBusinesses.latitude, CBusinesses.longitude, Inspections_notnull.score  from CBusinesses, Inspections_notnull where CBusinesses.BUSINESS_ID=Inspections_notnull.BUSINESS_ID and Inspections_notnull.score <>0 order by Inspections_notnull.score""").show()

2) val df = businessesDF.join(raw_inspectionsDF, businessesDF.col("BUSINESS_ID") == raw_inspectionsDF.col("BUSINESS_ID"))

我应该怎么写? 谢谢!

【问题讨论】:

    标签: scala spark-dataframe


    【解决方案1】:
    val df = businessesDF.join(raw_inspectionsDF, businessesDF("BUSINESS_ID") === raw_inspectionsDF("BUSINESS_ID"))
    

    这应该可行,请查看此处了解更多详细信息:https://spark.apache.org/docs/1.5.1/api/java/org/apache/spark/sql/DataFrame.html

    【讨论】:

    • 您好,感谢您的回复。但是当我尝试显示它时,我收到错误-> val df = businessDF.join(raw_inspectionsDF,businesssDF("BUSINESS_ID") === raw_inspectionsDF("BUSINESS_ID")).show() org.apache.spark .SparkException:作业因阶段失败而中止:阶段 27.0 中的任务 0 失败 1 次,最近一次失败:阶段 27.0 中丢失任务 0.0(TID 36,本地主机):java.lang.ArrayIndexOutOfBoundsException:12
    • 你能发布你正在使用的代码吗?错误在第 12 行。所以需要更多信息。数据框连接不应抛出数组索引错误
    • 您可以租用编辑答案并将您的代码粘贴到那里。它在评论中非常难以阅读。
    【解决方案2】:

    当然...我为每个数据集创建了案例类,然后按选项卡将其拆分,然后将 rdd 转换为数据框

    import sqlContext. implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    import scala.util.{Try, Success, Failure}
    
    def parseScore(s: String): Option[Int] = {                                  
      Try(s.toInt) match {
    case Success(x) => Some(x)
    case Failure(x) => None
    }
    }
    
    case class CInspections (business_id:Int, score:Option[Int], date:String, type1:String)
    val baseDir = "/FileStore/tables/484qrxx21488929011080/"
    val raw_inspections = sc.textFile (s"$baseDir/inspections_plus.txt")
    val raw_inspectionsmap = raw_inspections.map ( line => line.split ("\t"))   
    val raw_inspectionsRDD = raw_inspectionsmap.map ( raw_inspections =>        CInspections (raw_inspections(0).toInt,parseScore(raw_inspections(1)),         raw_inspections(2),raw_inspections(3)))
    val raw_inspectionsDF = raw_inspectionsRDD.toDF
    raw_inspectionsDF.createOrReplaceTempView ("Inspections")
    raw_inspectionsDF.printSchema
    //raw_inspectionsDF.show()
    val raw_inspectionsDF_replacenull = raw_inspectionsDF.na.fill(0)     //  Replacing null values with '0'
    raw_inspectionsDF_replacenull.show()
    raw_inspectionsDF_replacenull.createOrReplaceTempView     ("Inspections_notnull")
    
    
    For Business --> 
      case class CBusinesses (business_id:Int, name: String, address:String,     city:String, postal_code:Int, latitude:String, longitude:String, phone_number:String, tax_code:String, business_certificate:String, application_date:String,owner_name:String, owner_address:String, owner_city:String, owner_state:String,  owner_zip:String )  
    val businesses = sc.textFile (s"$baseDir/businesses_plus.txt")
    val businessesmap = businesses.map ( line => line.split ("\t"))
    val businessesRDD = businessesmap.map ( businesses => CBusinesses (businesses(0).toInt, businesses(1),      businesses(2),businesses(3),businesses(4).toInt,
    businesses(5),businesses(6), businesses(7), businesses(8), businesses(9),     businesses(10), businesses(11), businesses(12), businesses(13), businesses(14),     businesses(15)))
     val businessesDF = businessesRDD.toDF
     businessesDF.createOrReplaceTempView ("CBusinesses")
     businessesDF.printSchema
    //businessesDF.show()
    
     It is showing proper resiult for both dataframe
     For Inspection -->
      +-----------+-----+--------+--------------------+
      |business_id|score|    date|               type1|
      +-----------+-----+--------+--------------------+
      |         10|    0|20140807|Reinspection/Foll...|
     |         10|   94|20140729|Routine - Unsched...|
     |         10|    0|20140124|Reinspection/Foll...|
     |         10|   92|20140114|Routine - Unsched...|
    
    For Business -->
    +-----------+--------------------+--------------------+-------------+-----------+---------+-----------+------------+--------+--------------------+----------------+--------------------+--------------------+-----------------+-------------+---------+
    |business_id|                name|             address|             city|postal_code| latitude|  longitude|phone_number|tax_code|business_certificate|application_date|           owner_name|       owner_address|       owner_city|  owner_state|owner_zip|
    +-----------+--------------------+--------------------+-------------+-------    ----+---------+-----------+------------+--------+--------------------+----------    ------+--------------------+--------------------+-----------------+-------------    +---------+
    |         10|    Tiramisu Kitchen|       033 Belden Pl|San Francisco|          94104|37.791116|-122.403816|            |     H24|              779059|                    |        Tiramisu LLC|        33 Belden St|    San Francisco|           CA|        94104|
    |         17|GEORGE'S COFFEE SHOP|   2200 OAKDALE Ave |         S.F.|          94124|37.741086|-122.401737| 14155531470|     H24|               78443|              4/5/75|"LIEUW, VICTOR & ...| 648 MACARTHUR DRIVE|        DALY CITY|                  CA|    94015|
    

    【讨论】:

    • 对于两个数据框它都工作正常,但对于我最初发布的 2 个查询来计算前 10 分它给出的错误是 ArrayIndexoutofBound
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-06-02
    • 2022-01-16
    • 1970-01-01
    • 2020-11-26
    • 1970-01-01
    • 2016-05-25
    • 1970-01-01
    相关资源
    最近更新 更多