【问题标题】:Spark Overlap algorithm using Dataframes使用 Dataframes 的 Spark Overlap 算法
【发布时间】:2019-02-04 15:53:10
【问题描述】:

给定一个包含字段的数据源:product_id - product - start_time - end_time

我正在尝试使用 Dataframe 函数为相同的product(基于start_timeend_time)构建捕获重叠记录的逻辑。

------------------------------------------------
| product_id | product | start_time | end_time |
------------------------------------------------
|      1     | bottle  |     2      |    4     |
|      2     | bottle  |     3      |    5     |
|      3     | bottle  |     2      |    3     |
|      4     | bottle  |     6      |    7     |
|      1     |   can   |     2      |    4     |
|      2     |   can   |     5      |    6     |
|      3     |   can   |     2      |    4     |

我想在输出中接收

-------------------------------------------------------------------------------------------------
| product_id_a | product_id_b | product | start_time_a | end_time_a | start_time_b | end_time_b |
-------------------------------------------------------------------------------------------------
|       1      |       2      | bottle  |      2       |     4      |      3       |     5      |
|       1      |       3      | bottle  |      2       |     4      |      2       |     3      |

因为bottle_1bottle_2bottle_3有重叠时间,如果满足以下条件,则有两条记录重叠:

  • max(a.start_time, b.start_time) < min(a.end_time, b.end_time)
  • !(a.start_time == b.start_time && a.end_time == b.end_time)
  • a.start_time != b.start_time || a.end_time != b.end_time

最后两个条件只是指定我对start_timeend_time 相等的情况不感兴趣(例如can_1can_3 不在预期结果中,即使它们具有相同的@987654341 @ 和 end_time)。

问题的结构很容易想到使用 RDD 的 MapReduce 解决方案,但我对 Dataframes 的解决方案感兴趣。

提示:有没有可能用groupBy().agg() 指定一个有趣的条件来达到所描述的逻辑?

如有任何进一步的解释,请随时询问

不重复,属于How to aggregate over rolling time window with groups in Spark

不幸的是,在报告的答案中使用了F.lag,在我的情况下,这不是一个足够好的条件:F.lag 仅使用与以前记录的比较,但在报告的示例中无法按预期工作,因为那bottle_1 不会被报告为与 bottle_3 重叠,因为它们不是连续的记录

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql


    【解决方案1】:

    每个条件都可以直接翻译成SQL

    from pyspark.sql.functions import col, least, greatest
    
    cond1 = (
        greatest(col("a.start_time"), col("b.start_time")) < 
        least(col("a.end_time"), col("b.end_time"))
    )
    
    cond2 = ~(
        (col("a.start_time") == col("b.start_time")) & 
        (col("a.end_time") == col("b.end_time"))
    )
    
    cond3 = (
        (col("a.start_time") != col("b.start_time")) | 
        (col("a.end_time") != col("b.end_time"))
    )
    

    所以你可以加入和过滤。

    (df.alias("a").join(df.alias("b"), ["product"]).filter(cond1 & cond2 & cond3))
    

    【讨论】:

      【解决方案2】:

      基于@Andronicus solution,我在纯Python 中提出了这种方法。

      有必要自己加入DataFrame 以检查行是否重叠。当然,您需要使用条件df.product_id &lt; duplicate_df.product_id 省略self(两个相同的Row 和相反的product_ids 重叠)。

      整个代码:

      from pyspark.sql import functions as F
      
      df = spark.createDataFrame(
          [(1, "bottle", 2, 4),
           (2, "bottle", 3, 5),
           (3, "bottle", 2, 3),
           (4, "bottle", 6, 7),
           (1, "can", 2, 4),
           (2, "can", 5, 6),
           (3, "can", 2, 4)], 
           ['product_id', 'product', 'start_time', 'end_time'])
      
      duplicate_df = df
      
      conditions = [df.product == duplicate_df.product,
                    df.product_id < duplicate_df.product_id,
                    df.start_time != duplicate_df.start_time, 
                    df.end_time != duplicate_df.end_time,
                    F.least(df.end_time, duplicate_df.end_time) >
                    F.greatest(df.start_time, duplicate_df.start_time)]
      
      df.join(duplicate_df, conditions)
      

      【讨论】:

        【解决方案3】:

        试试这个:

        df.join(cloneDf, $"label").where($"label" !== $"label1").where($"min" < $"max1").where($"min1" < $"max").show()
        

        您需要制作DataFrame 的笛卡尔积来检查,如果行重叠,您可以随意映射它们。当然,您需要省略 self - 两个相同的 Rows 重叠。

        整个代码:

        val df = SparkEmbedded.ss.createDataFrame(Seq(
          (1, 2, 5),
          (2, 4, 7),
          (3, 6, 9)
        )).toDF("product_id", "min", "max")
        import SparkEmbedded.ss.implicits._
        val cloneDf = df.select(df.columns.map(col):_*)
            .withColumnRenamed("product_id", "product_id1")
            .withColumnRenamed("min", "min1")
            .withColumnRenamed("max", "max1")
        df.crossJoin(cloneDf)
          .where($"product_id" < $"product_id1")
          .where($"min" < $"max1")
          .where($"min1" < $"max").show()
        

        为了清楚起见,我拆分了where 子句。

        结果是:

        +-----+---+---+------+----+----+
        |label|min|max|label1|min1|max1|
        +-----+---+---+------+----+----+
        |    1|  2|  5|     2|   4|   7|
        |    2|  4|  7|     3|   6|   9|
        +-----+---+---+------+----+----+
        

        这个例子是用 Scala 编写的,但是 Python 有类似的 API。

        【讨论】:

        • 非常感谢!两种解决方案(您的和@user11013893)都帮助我解决了我的问题!你的解决方案有where($"label" &lt; $"label1"),它帮助我避免了重复的结果,而另一个提供了更有趣的功能。可能crossJoin 不是必需的:基于product 的普通内部连接(参见示例)就足够了。我不明白为什么我的问题被否决了(我投了赞成票的答案也是如此)。但我认为您的答案在where($"label" &lt; $"label1") 中使用&lt; condition 更有用。我会接受你的
        • 为了提高答案的可读性,您能否编辑您的帖子并使其更接近我报告的示例?所以把where($"label" &lt; $"label1")改成where($"product_id" &lt; $"product_id1"),基于product的join...等等
        • 我建议将 python 代码放在单独的答案中,因为它完全改变了编程风格。
        • 我开始使用 python 使用 ML,但它对我来说太动态了。 Scala 是完美的(我专业地用 Java 开发);>
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2016-10-14
        • 1970-01-01
        • 2017-08-20
        • 2018-11-06
        • 2016-03-27
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多