【问题标题】:find the closest time between two tables in spark在火花中找到两个表之间最接近的时间
【发布时间】:2016-12-02 02:28:16
【问题描述】:

我正在使用 pyspark,我有两个这样的数据框:

user         time          bus
 A    2016/07/18 12:00:00   1
 B    2016/07/19 12:00:00   2
 C    2016/07/20 12:00:00   3

bus          time          stop
 1    2016/07/18 11:59:40   sA
 1    2016/07/18 11:59:50   sB
 1    2016/07/18 12:00:05   sC
 2    2016/07/19 11:59:40   sB
 2    2016/07/19 12:00:10   sC
 3    2016/07/20 11:59:55   sD
 3    2016/07/20 12:00:10   sE

现在我想知道用户在第二个表中根据公交车号和最近时间报告的站点。

例如表1中,用户A在2016/07/18 12:00:00报到,他在1号公交车上,根据第二个表,1号公交车有3条记录,但最接近的时间是2016/07/18 12:00:05(第三条记录),所以用户现在在sC。

想要的输出应该是这样的:

user         time          bus  stop
 A    2016/07/18 12:00:00   1    sC
 B    2016/07/19 12:00:00   2    sC
 C    2016/07/20 12:00:00   3    sD

我已将时间转换为时间戳,因此唯一的问题是找到最近的时间戳,其中总线号为 eqaul。

因为我现在对sql不熟悉,所以我尝试使用map函数来查找最近的时间和它的停靠点,这意味着我必须在map函数中使用sqlContext.sql,而spark似乎没有允许这样做:

异常:您似乎正试图从广播变量、操作或转换中引用 SparkContext。 SparkContext 只能在驱动程序上使用,不能在它在工作人员上运行的代码中使用。有关详细信息,请参阅 SPARK-5063。

那么我怎样才能编写一个 sql 查询来获得正确的输出呢?

【问题讨论】:

    标签: sql apache-spark pyspark apache-spark-sql


    【解决方案1】:

    这可以使用窗口函数来完成。

    from pyspark.sql.window import Window
    from pyspark.sql import Row, functions as W
    
    def tm(str):
        return datetime.strptime(str, "%Y/%m/%d %H:%M:%S")
    
    #setup data
    userTime = [ Row(user="A",time=tm("2016/07/18 12:00:00"),bus = 1) ]
    userTime.append(Row(user="B",time=tm("2016/07/19 12:00:00"),bus = 2))
    userTime.append(Row(user="C",time=tm("2016/07/20 12:00:00"),bus = 3))
    
    busTime = [ Row(bus=1,time=tm("2016/07/18 11:59:40"),stop = "sA") ]
    busTime.append(Row(bus=1,time=tm("2016/07/18 11:59:50"),stop = "sB"))
    busTime.append(Row(bus=1,time=tm("2016/07/18 12:00:05"),stop = "sC"))
    busTime.append(Row(bus=2,time=tm("2016/07/19 11:59:40"),stop = "sB"))
    busTime.append(Row(bus=2,time=tm("2016/07/19 12:00:10"),stop = "sC"))
    busTime.append(Row(bus=3,time=tm("2016/07/20 11:59:55"),stop = "sD"))
    busTime.append(Row(bus=3,time=tm("2016/07/20 12:00:10"),stop = "sE"))
    
    #create RDD
    userDf = sc.parallelize(userTime).toDF().alias("usertime")
    busDf = sc.parallelize(busTime).toDF().alias("bustime")
    
    joinedDF = userDf.join(busDf,col("usertime.bus") == col("bustime.bus"),"inner").select(
        userDf.user,
        userDf.time.alias("user_time"),
        busDf.bus,
        busDf.time.alias("bus_time"),
        busDf.stop)
    
    additional_cols = joinedDF.withColumn("bus_time_diff",  abs(unix_timestamp(col("bus_time")) - unix_timestamp(col("user_time"))))
    
    partDf = additional_cols.select("user","user_time","bus","bus_time","stop","bus_time_diff", W.rowNumber().over(Window.partitionBy("user","bus").orderBy("bus_time_diff") ).alias("rank") ).filter(col("rank") == 1)
    
    
    additional_cols.show(20,False)
    partDf.show(20,False)
    

    输出:

    +----+---------------------+---+---------------------+----+-------------+
    |user|user_time            |bus|bus_time             |stop|bus_time_diff|
    +----+---------------------+---+---------------------+----+-------------+
    |A   |2016-07-18 12:00:00.0|1  |2016-07-18 11:59:40.0|sA  |20           |
    |A   |2016-07-18 12:00:00.0|1  |2016-07-18 11:59:50.0|sB  |10           |
    |A   |2016-07-18 12:00:00.0|1  |2016-07-18 12:00:05.0|sC  |5            |
    |B   |2016-07-19 12:00:00.0|2  |2016-07-19 11:59:40.0|sB  |20           |
    |B   |2016-07-19 12:00:00.0|2  |2016-07-19 12:00:10.0|sC  |10           |
    |C   |2016-07-20 12:00:00.0|3  |2016-07-20 11:59:55.0|sD  |5            |
    |C   |2016-07-20 12:00:00.0|3  |2016-07-20 12:00:10.0|sE  |10           |
    +----+---------------------+---+---------------------+----+-------------+
    +----+---------------------+---+---------------------+----+-------------+----+
    |user|user_time            |bus|bus_time             |stop|bus_time_diff|rank|
    +----+---------------------+---+---------------------+----+-------------+----+
    |A   |2016-07-18 12:00:00.0|1  |2016-07-18 12:00:05.0|sC  |5            |1   |
    |B   |2016-07-19 12:00:00.0|2  |2016-07-19 12:00:10.0|sC  |10           |1   |
    |C   |2016-07-20 12:00:00.0|3  |2016-07-20 11:59:55.0|sD  |5            |1   |
    +----+---------------------+---+---------------------+----+-------------+----+
    

    【讨论】:

    • 很高兴你解决了我的问题,非常感谢!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-03
    • 1970-01-01
    • 2016-11-04
    • 2021-06-23
    相关资源
    最近更新 更多