【问题标题】:Spark join array火花连接数组
【发布时间】:2017-10-03 05:36:14
【问题描述】:

我是 spark 的新手(小时),而且对 Scala 还很陌生。但是,我长期以来一直渴望对两者都更加熟悉。

我有一个相当琐碎的任务。我有两个从两个 JSON 文件导入的数据框。一个带有uuid,text,tag_ids,另一个带有标签id,term 我想生成一个新的json 文件,我可以将其导入solr,其中包含uuid、text、tag_ids、tag_terms。

val text = spark.sqlContext.jsonFile("/tmp/text.js")
val tags = spark.sqlContext.jsonFile("/tmp/tags.js")



text.printSchema()

root
| -- uuid: string (nullable = true)
| -- tag_ids: array (nullable = true)
|    | -- element: string (contiansNull = true)
| -- text: string (nullable = true)

tags.printSchema()
root
| -- id: string (nullable = true)
| -- term: string (nullable = true)


#desired output  
+--------------------+------+---------+------------+
|                uuid| text | tag_ids |   tag_terms|  
+--------------------+------+---------+------------+    
|cf5c1f4c-96e6-4ca...| foo  |    [1,2]| [tag1,tag2]|      
|c9834e2e-0f04-486...| bar  |    [2,3]| [tag2,tag3]|   
+--------------------+--------------+--------------+

很难展示我一直在尝试的所有内容。本质上,.join() 在 tag_ids 是一个数组方面存在问题。我可以explode()tag_ids 并加入tag_terms,但将其重新组装成一个新的 df 以导出仍然超出了我的水平。

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    使用explode的解决方案:

    val result = text
      .withColumn("tag_id", explode($"tag_ids"))
      .join(tags,  $"tag_id" === $"id")
      .groupBy("uuid", "tag_ids")
      .agg(first("text") as "text", collect_list("term") as "tag_terms")
    

    【讨论】:

    • 谢谢这工作。在我的实际数据中,我还有几列,有些行有空值或没有实际的 tag_ids。这很容易通过左连接解决。
    【解决方案2】:

    试试这个:

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.{SQLContext, SparkSession}
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    
    val text = spark.sqlContext.jsonFile("/tmp/text.js")
    val tags = spark.sqlContext.jsonFile("/tmp/tags.js")
    
     val df1 = spark.sparkContext.parallelize(text, 4).toDF()
     val df2 = spark.sparkContext.parallelize(tags, 4).toDF()
    
     df1.createOrReplaceTempView("A")
     df2.createOrReplaceTempView("B")
    
    
    spark.sql("select d1.key,d1.value,d2.value1  from A d1  inner join B d2 on d1.key = d2.key").show()
    

    【讨论】:

      猜你喜欢
      • 2016-12-09
      • 2017-04-01
      • 1970-01-01
      • 2016-07-30
      • 2018-11-24
      • 1970-01-01
      • 2021-10-16
      • 2018-08-18
      • 2015-08-07
      相关资源
      最近更新 更多