【问题标题】:PySpark how to join 2 DataFrames efficiently with non-matching keysPySpark 如何使用不匹配的键有效地连接 2 个 DataFrame
【发布时间】:2020-09-06 00:10:32
【问题描述】:

我有两个不同大小的数据框: df1(2200万行),包含纬度、经度、日期、事件 df2(100K行),包含纬度、经度、日期、温度

对于 df1 中的每一行,我想在表 df2 中找到该日期最接近的匹配位置(由欧几里德距离定义)(表 df2 具有所有可能的日期,而表 df1 没有)。

df1

+----------+-----------+----------+------------+
| Latitude | Longitude |   Date   |   Event    |
+----------+-----------+----------+------------+
|       10 |        10 | 11/10/20 | Water Polo |
|       20 |        20 | 11/22/19 | Cricket    |
+----------+-----------+----------+------------+

df2:

+----------+-----------+----------+---------+
| Latitude | Longitude |   Date   | Weather |
+----------+-----------+----------+---------+
|       20 |        20 | 11/10/20 |      90 |
|       12 |        12 | 11/10/20 |      80 |
|       10 |        10 | 11/22/19 |      34 |
|       18 |        18 | 11/22/19 |      45 |
+----------+-----------+----------+---------+

期望的输出:

+----------+-----------+----------+---------+------------+
| Latitude | Longitude |   Date   | Weather |   Event    |
+----------+-----------+----------+---------+------------+
|       10 |        10 | 11/10/20 |      80 | Water Polo |
|       20 |        20 | 11/22/19 |      45 | Cricket    |
+----------+-----------+----------+---------+------------+

我是 PySpark 的新手,不知道如何以有效的方式编写此查询。

【问题讨论】:

    标签: dataframe apache-spark pyspark


    【解决方案1】:

    首先你可以加入 Date,然后使用 pyspark 内置函数 sqrt, pow 计算你的 Distance,然后取一个 window partitioned by Date 来计算 minimum distance per date(min_Distance),然后 filter 就可以了。

    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    
    w=Window().partitionBy("Date")
    
    df1.join(df2.withColumnRenamed("Latitude","Latitude1")\
                .withColumnRenamed("Longitude","Longitude1"),['Date'])\
       .withColumn("Distance",F.sqrt(F.pow(F.col("Latitude")-F.col("Latitude1"),2)+\
                                     F.pow(F.col("Longitude")-F.col("Longitude1"),2)))\
       .withColumn("min_Distance", F.min("Distance").over(w))\
       .filter('Distance=min_Distance')\
       .select("Latitude","Longitude","Date","Weather","Event").show()
    
    #+--------+---------+--------+-------+----------+
    #|Latitude|Longitude|    Date|Weather|     Event|
    #+--------+---------+--------+-------+----------+
    #|      10|       10|11/10/20|     80|Water Polo|
    #|      20|       20|11/22/19|     45|   Cricket|
    #+--------+---------+--------+-------+----------+
    

    【讨论】:

      猜你喜欢
      • 2022-01-04
      • 1970-01-01
      • 1970-01-01
      • 2018-09-28
      • 1970-01-01
      • 2010-11-18
      • 2017-06-04
      • 2012-04-28
      • 2017-01-27
      相关资源
      最近更新 更多