【问题标题】:Joining two dataframes in scala with a column which are not having exact values将scala中的两个数据框与一个没有精确值的列连接起来
【发布时间】:2020-02-11 23:46:31
【问题描述】:

我尝试将两个数据框合并到一个不具有完全相同值的列。

下面给出的是DF1

+--------+-----+------+
| NUM_ID | TIME|SG1_V |
+--------+-----+------+
|XXXXX01 |1001 |79.0  |
|XXXXX01 |1005 |88.0  |
|XXXXX01 |1010 |99.0  |
|XXXXX01 |1015 |null  |
|XXXXX01 |1020 |100.0 |
|XXXXX02 |1001 |81.0  |
|XXXXX02 |1010 |91.0  |
|XXXXX02 |1050 |93.0  |
|XXXXX02 |1060 |93.0  |
|XXXXX02 |1070 |93.0  |
+--------+-----+------+

下面是DF2

+---------+-----+------+
| NUM_ID  | TIME|SG2_V |
+---------+-----+------+
|XXXXX01  |1001 |  99.0|
|XXXXX01  |1003 |  22.0|
|XXXXX01  |1007 |  85.0|
|XXXXX01  |1011 |  1.0 |

|XXXXX02  |1001 |  22.0|
|XXXXX02  |1009 |  85.0|
|XXXXX02  |1048 |  1.0 |
|XXXXX02  |1052 |  99.0|
+---------+-----+------+

我必须在 NUM_ID 列上加入这两个 DF,这应该完全相同,在 TIME 列上可能/可能不是确切值。

DF2 中的 TIME 可能包含也可能不包含与 DF1 中的精确值。如果该值不准确,我必须加入可用的最接近的值(即 - DF2 中的列值应该 =

看了下图的预期输出会更清楚。

+--------+-----+------+-----+------+
| NUM_ID | TIME|SG1_V | TIME|SG2_V |
+--------+-----+------+-----+------+
|XXXXX01 |1001 |79.0  |1001 |  99.0|
|XXXXX01 |1005 |88.0  |1003 |  22.0|
|XXXXX01 |1010 |99.0  |1007 |  85.0|
|XXXXX01 |1015 |null  |1011 |  1.0 |
|XXXXX01 |1020 |100.0 |1011 |  1.0 |

|XXXXX02 |1001 |81.0  |1001 |  22.0|
|XXXXX02 |1010 |91.0  |1009 |  85.0|
|XXXXX02 |1050 |93.0  |1048 |  1.0 |
|XXXXX02 |1060 |93.0  |1052 |  99.0|
|XXXXX02 |1070 |93.0  |1052 |  99.0|
+--------+-----+------+-----+------+

对于NUM_ID XXXXX01,DF1中的TIME(1005)在DF2中是不可用的,所以它取的是小于1005的最接近的值(1003)。

如何加入,如果没有确切的值,则使用最接近的值加入。

欣赏任何潜在客户。 提前致谢。

【问题讨论】:

    标签: scala dataframe apache-spark hive apache-spark-sql


    【解决方案1】:

    上述解决方案是在将数据帧保存到配置单元表后加入数据帧。

    我尝试通过应用相同的逻辑来加入两个数据帧而不保存到配置单元表中,如下所示。

    val finalSignals = finalABC.as("df1").join(finalXYZ.as("df2"), $"df1.NUM_ID" === $"df2.NUM_ID" && $"df2.TIME"  <= $"df1.TIME", "left").withColumn("rno", row_number.over(Window.partitionBy($"df1.NUM_ID", $"df1.TIME").orderBy($"df1.TIME" - $"df2.TIME"))).select(col("df1.NUM_ID").as("NUM_ID"),col("df1.TIME"),col("df2.NUM_ID").as("NUM_ID2"),col("df1.TIME").as("TIME2"),
    col("rno")).filter("rno == 1")
    

    这是否等同于上面提供的解决方案

    spark.sql("""
         |   SELECT * FROM (
         |     SELECT *,
         |       ROW_NUMBER() OVER (PARTITION BY df1.NUM_ID, df1.TIME ORDER BY (df1.TIME - df2.TIME)) rno
         |     FROM df1 JOIN df2 
         |     ON df2.NUM_ID = df1.NUM_ID AND 
         |        df2.TIME  <= df1.TIME
         |   ) T
         | WHERE T.rno = 1
         |""")
    

    【讨论】:

    • @mazaneicha-请检查一下
    【解决方案2】:

    简单的方法是使用 Spark 的Window functions、row_number() 或 rank() 之一:

    scala> spark.sql("""
         |   SELECT * FROM (
         |     SELECT *,
         |       ROW_NUMBER() OVER (PARTITION BY df1.NUM_ID, df1.TIME ORDER BY (df1.TIME - df2.TIME)) rno
         |     FROM df1 JOIN df2 
         |     ON df2.NUM_ID = df1.NUM_ID AND 
         |        df2.TIME  <= df1.TIME
         |   ) T
         | WHERE T.rno = 1
         |""").show()
    +-------+----+-----+-------+----+-----+---+
    | NUM_ID|TIME|SG1_V| NUM_ID|TIME|SG2_V|rno|
    +-------+----+-----+-------+----+-----+---+
    |XXXXX01|1001| 79.0|XXXXX01|1001| 99.0|  1|
    |XXXXX01|1005| 88.0|XXXXX01|1003| 22.0|  1|
    |XXXXX01|1010| 99.0|XXXXX01|1007| 85.0|  1|
    |XXXXX01|1015| null|XXXXX01|1011|  1.0|  1|
    |XXXXX01|1020|100.0|XXXXX01|1011|  1.0|  1|
    |XXXXX02|1001| 81.0|XXXXX02|1001| 22.0|  1|
    |XXXXX02|1010| 91.0|XXXXX02|1009| 85.0|  1|
    +-------+----+-----+-------+----+-----+---+
    
    scala>
    

    【讨论】:

    • @mazaneicha- 由于有两列名称为 NUM_ID 和 TIME,我将如何为不同的名称设置别名并从该结果中仅选择几列?我尝试做别名但最终出现错误org.apache.spark.sql.AnalysisException: Reference 'NUM_ID' is ambiguous, could be: T.NUM_ID, T.NUM_ID;
    • 在投影中使用列列表而不是* 应该可以,不是吗?喜欢SELECT df1.NUM_ID as NUM_ID1, df2.NUM_ID as NUM_ID2, ... FROM df1 JOIN df2 ...
    【解决方案3】:

    如果您需要使用其中一个字段的特定间隔加入两个字段,您可以执行以下操作:

      import org.apache.spark.sql.functions.when
    
      val spark = SparkSession.builder().master("local[1]").getOrCreate()
    
      val df1 : DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row("XXXXX01",1001,79.0),
        Row("XXXXX01",1005,88.0),
        Row("XXXXX01",1010,99.0),
        Row("XXXXX01",1015, null),
        Row("XXXXX01",1020,100.0),
        Row("XXXXX02",1001,81.0))),
        StructType(Seq(StructField("NUM_ID", StringType, false), StructField("TIME", IntegerType, false), StructField("SG1_V", DoubleType, true))))
    
      val df2 : DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row("XXXXX01",1001,79.0),
        Row("XXXXX01",1001, 99.0),
        Row("XXXXX01",1003, 22.0),
        Row("XXXXX01",1007, 85.1),
        Row("XXXXX01",1011, 1.0),
        Row("XXXXX02",1001,22.0))),
        StructType(Seq(StructField("NUM_ID", StringType, false), StructField("TIME", IntegerType, false), StructField("SG1_V", DoubleType, false))))
    
      val interval : Int = 10
    
      def main(args: Array[String]) : Unit = {
        df1.join(df2, ((df1("TIME")) - df2("TIME") > lit(interval)) && df1("NUM_ID") === df2("NUM_ID")).show()
      } 
    

    它给出了结果:

    +-------+----+-----+-------+----+-----+
    | NUM_ID|TIME|SG1_V| NUM_ID|TIME|SG1_V|
    +-------+----+-----+-------+----+-----+
    |XXXXX01|1015| null|XXXXX01|1001| 79.0|
    |XXXXX01|1015| null|XXXXX01|1001| 99.0|
    |XXXXX01|1015| null|XXXXX01|1003| 22.0|
    |XXXXX01|1020|100.0|XXXXX01|1001| 79.0|
    |XXXXX01|1020|100.0|XXXXX01|1001| 99.0|
    |XXXXX01|1020|100.0|XXXXX01|1003| 22.0|
    |XXXXX01|1020|100.0|XXXXX01|1007| 85.1|
    +-------+----+-----+-------+----+-----+
    

    【讨论】:

    • @EmiCareOfCell44-考虑的基本 TIME 列属于 DF1。 DF1 TIME 列中的所有值都应存在于结果 DF 中。在上述解决方案中,缺少 1005、1010 次???
    • 为此,您可以使用 left_join 并仅从右侧获取与两个字段连接函数匹配的行。如果您需要示例,我可以更新答案
    猜你喜欢
    • 1970-01-01
    • 2021-10-22
    • 2019-03-13
    • 2016-09-19
    • 1970-01-01
    • 2019-12-03
    • 2022-12-12
    • 2016-06-16
    • 1970-01-01
    相关资源
    最近更新 更多