【问题标题】:Spark update value in the second dataset based on the value from first datasetSpark 根据第一个数据集中的值更新第二个数据集中的值
【发布时间】:2018-06-06 16:41:27
【问题描述】:

我有两个 spark 数据集,一个包含 accountid 和 key 列,key 列采用数组 [key1,key2,key3..] 的格式,另一个数据集包含两个 accountid 和 key values 列,在 json 中。 accountid ,{键:值,键,值...}。如果第一个数据集中出现 accountid 的键,我需要更新第二个数据集中的值。

   import org.apache.spark.sql.functions._
val df= sc.parallelize(Seq(("20180610114049", "id1","key1"),
  ("20180610114049", "id2","key2"),
  ("20180610114049", "id1","key1"),
  ("20180612114049", "id2","key1"),
  ("20180613114049", "id3","key2"),
  ("20180613114049", "id3","key3")
 )).toDF("date","accountid", "key")
val gp=df.groupBy("accountid","date").agg(collect_list("key"))

    +---------+--------------+-----------------+
|accountid|          date|collect_list(key)|
+---------+--------------+-----------------+
|      id2|20180610114049|           [key2]|
|      id1|20180610114049|     [key1, key1]|
|      id3|20180613114049|     [key2, key3]|
|      id2|20180612114049|           [key1]|
+---------+--------------+-----------------+


val df2= sc.parallelize(Seq(("20180610114049", "id1","{'key1':'0.0','key2':'0.0','key3':'0.0'}"),
  ("20180610114049", "id2","{'key1':'0.0','key2':'0.0','key3':'0.0'}"),
  ("20180611114049", "id1","{'key1':'0.0','key2':'0.0','key3':'0.0'}"),
  ("20180612114049", "id2","{'key1':'0.0','key2':'0.0','key3':'0.0'}"),
  ("20180613114049", "id3","{'key1':'0.0','key2':'0.0','key3':'0.0'}")
 )).toDF("date","accountid", "result")

+--------------+---------+----------------------------------------+
|date          |accountid|result                                  |
+--------------+---------+----------------------------------------+
|20180610114049|id1      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|
|20180610114049|id2      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|
|20180611114049|id1      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|
|20180612114049|id2      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|
|20180613114049|id3      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|
+--------------+---------+----------------------------------------+

预期输出

+--------------+---------+----------------------------------------+
|date          |accountid|result                                  |
+--------------+---------+----------------------------------------+
|20180610114049|id1      |{'key1':'1.0','key2':'0.0','key3':'0.0'}|
|20180610114049|id2      |{'key1':'0.0','key2':'1.0','key3':'0.0'}|
|20180611114049|id1      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|
|20180612114049|id2      |{'key1':'1.0','key2':'0.0','key3':'0.0'}|
|20180613114049|id3      |{'key1':'0.0','key2':'1.0','key3':'1.0'}|
+--------------+---------+----------------------------------------+

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    你肯定需要一个 UDF 来干净利落地做这件事。

    您可以在加入dateaccountid 后将数组和 JSON 都传递给 UDF,使用您选择的解析器解析 UDF 中的 JSON(我在示例中使用 JSON4S),检查如果该键存在于数组中,然后更改了值,则再次将其转换为 JSON 并从 UDF 中返回。

    val gp=df.groupBy("accountid","date").agg(collect_list("key").as("key"))
    
    val joined = df2.join(gp, Seq("date", "accountid") , "left_outer")
    
    joined.show(false)
    //+--------------+---------+----------------------------------------+------------+
    //|date          |accountid|result                                  |key         |
    //+--------------+---------+----------------------------------------+------------+
    //|20180610114049|id2      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|[key2]      |
    //|20180613114049|id3      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|[key2, key3]|
    //|20180610114049|id1      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|[key1, key1]|
    //|20180611114049|id1      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|null        |
    //|20180612114049|id2      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|[key1]      |
    //+--------------+---------+----------------------------------------+------------+
    
    // the UDF that will do the most work
    // it's important to declare `formats` inside the function
    // to avoid object not Serializable exception
    // Not all cases are covered, use with caution :D
    val convertJsonValues = udf{(json: String, arr: Seq[String]) =>
        import org.json4s.jackson.JsonMethods._
        import org.json4s.JsonDSL._
        implicit val format = org.json4s.DefaultFormats
        // replace single quotes with double
        val kvMap = parse(json.replaceAll("'", """"""")).values.asInstanceOf[Map[String,String]]
        val updatedKV = kvMap.map{ case(k,v) => if(arr.contains(k)) (k,"1.0") else (k,v) }
        compact(render(updatedKV))
    }
    
    // Use when-otherwise and send empty array where `key` is null
    joined.select($"date", 
                  $"accountid",
                  when($"key".isNull, convertJsonValues($"result", array()))
                   .otherwise(convertJsonValues($"result", $"key"))
                   .as("result")
                  ).show(false)
    
    //+--------------+---------+----------------------------------------+
    //|date          |accountid|result                                  |
    //+--------------+---------+----------------------------------------+
    //|20180610114049|id2      |{"key1":"0.0","key2":"1.0","key3":"0.0"}|
    //|20180613114049|id3      |{"key1":"0.0","key2":"1.0","key3":"1.0"}|
    //|20180610114049|id1      |{"key1":"1.0","key2":"0.0","key3":"0.0"}|
    //|20180611114049|id1      |{"key1":"0.0","key2":"0.0","key3":"0.0"}|
    //|20180612114049|id2      |{"key1":"1.0","key2":"0.0","key3":"0.0"}|
    //+--------------+---------+----------------------------------------+
    

    【讨论】:

      【解决方案2】:

      您可以在 join 两个数据帧之后使用 udf 函数来实现您的要求。当然还有 converting json to struct, struct to json, case class usage 之类的东西(提供了cmets做进一步的解释)

      import org.apache.spark.sql.functions._
      
      //aliasing the collected key
      val gp = df.groupBy("accountid","date").agg(collect_list("key").as("keys"))
      
      //schema for converting json to struct
      val schema = StructType(Seq(StructField("key1", StringType, true), StructField("key2", StringType, true), StructField("key3", StringType, true)))
      
      //udf function to update the values of struct where result is a case class
      def updateKeysUdf = udf((arr: Seq[String], json: Row) => Seq(json.schema.fieldNames.map(key => if(arr.contains(key)) "1.0" else json.getAs[String](key))).collect{case Array(a,b,c) => result(a,b,c)}.toList(0))
      
      //changing json string to stuct using the above schema
      df2.withColumn("result", from_json(col("result"), schema))
        .as("df2")   //aliasing df2 for joining and selecting
          .join(gp.as("gp"), col("df2.accountid") === col("gp.accountid"), "left")   //aliasing gp dataframe and joining with accountid
          .select(col("df2.accountid"), col("df2.date"), to_json(updateKeysUdf(col("gp.keys"), col("df2.result"))).as("result"))  //selecting and calling above udf function and finally converting to json stirng
        .show(false)
      

      其中结果是case class

      case class result(key1: String, key2: String, key3: String)
      

      这应该给你

      +---------+--------------+----------------------------------------+
      |accountid|date          |result                                  |
      +---------+--------------+----------------------------------------+
      |id3      |20180613114049|{"key1":"0.0","key2":"1.0","key3":"1.0"}|
      |id1      |20180610114049|{"key1":"1.0","key2":"0.0","key3":"0.0"}|
      |id1      |20180611114049|{"key1":"1.0","key2":"0.0","key3":"0.0"}|
      |id2      |20180610114049|{"key1":"0.0","key2":"1.0","key3":"0.0"}|
      |id2      |20180610114049|{"key1":"1.0","key2":"0.0","key3":"0.0"}|
      |id2      |20180612114049|{"key1":"0.0","key2":"1.0","key3":"0.0"}|
      |id2      |20180612114049|{"key1":"1.0","key2":"0.0","key3":"0.0"}|
      +---------+--------------+----------------------------------------+
      

      希望回答对你有帮助

      【讨论】:

      • 我知道我没有在问题中提到这一点,可能有更多键,然后我不能将此解决方案与具有三个常量键的案例类一起使用。
      • 可以根据自己的需要增减@Masterbuilder :)
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-11-01
      • 1970-01-01
      • 2022-10-12
      • 2017-02-28
      • 2020-06-22
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多