【问题标题】:can this use-case be done by lag/any-other function of spark, if so how can this done这个用例可以通过spark的滞后/任何其他功能来完成吗,如果可以的话怎么做
【发布时间】:2021-05-13 18:39:26
【问题描述】:

我正在使用 spark-2.4.1v。我的项目中有一个用例,对于每个日期(process_date),我需要将当天记录与前一天记录一起考虑,并对该数据集执行某些其他操作。 那么如何为此准备数据集呢?我尝试了滞后功能,但没有取得太大的成功。

对于上述用例,给定数据如下:

+----------+----------+----+-------+------------+-----------+
|company_id|  gen_date|year|quarter|total_assets|create_date|
+----------+----------+----+-------+------------+-----------+
| 989856662|2019-01-02|2019|      1| 3900.435058| 2019-09-11|
| 989856665|2019-01-02|2019|      1| 4836.435058| 2019-09-11|
| 989856667|2019-01-02|2019|      1| 5836.435058| 2019-09-11|
| 989856662|2019-01-01|2019|      1| 3800.435058| 2019-09-11|
| 989856665|2019-01-01|2019|      1| 3834.435058| 2019-09-11|
| 989856667|2019-01-01|2019|      1| 5834.435058| 2019-09-11|
| 989856662|2018-12-31|2018|      4| 3700.435058| 2019-09-11|
| 989856665|2018-12-31|2018|      4| 3900.435058| 2019-09-11|
| 989856667|2018-12-31|2018|      4| 5833.435058| 2019-09-11|
| 989856662|2018-12-30|2018|      4| 3832.435058| 2019-09-11|
| 989856665|2018-12-30|2018|      4| 3700.435058| 2019-09-11|
| 989856667|2018-12-30|2018|      4| 5832.435058| 2019-09-11|
+----------+----------+----+-------+------------+-----------+

这里 gen_date 是关键列,对于每个 gen_date ,我需要取 它以前可用的 gen_date 记录。这些将被处理 一起设置,即 process_date 2019-01-02 - 它应该有 2019-01-02 和 2019-01-01 的记录同样适用于 process_date 记录 gen_date 2018-12-30 及其之前的 gen_date 即 2018-12-29,但是 此处 2018-12-29 gen_date 记录不可用,因此应该是 考虑 gen_date 2018-12-30 记录。

在给定的集合中

对于 process_date 2019-01-02 => (gen_date 2019-01-02) 的记录 + (gen_date 2019-01-01) 的记录 对于 process_date 2019-01-01 => (gen_date 2019-01-01) 的记录 + (gen_date 2018-12-31) 的记录 对于 process_date 2018-12-31 => (gen_date 2018-12-31) 的记录 + (gen_date 2018-12-30) 的记录 对于 process_date 2018-12-30 => (gen_date 2018-12-30) 的记录 + 没有以前的 gen_date 记录。

输出应该如下:

+----------+------------+----------+----+-------+------------+-----------+
|company_id|process_date|  gen_date|year|quarter|total_assets|create_date|
+----------+------------+----------+----+-------+------------+-----------+
| 989856662|  2019-01-02|2019-01-02|2019|      1| 3900.435058| 2019-09-11|
| 989856662|  2019-01-02|2019-01-01|2019|      1| 3800.435058| 2019-09-11|
| 989856665|  2019-01-02|2019-01-02|2019|      1| 4836.435058| 2019-09-11|
| 989856665|  2019-01-02|2019-01-01|2019|      1| 3834.435058| 2019-09-11|
| 989856667|  2019-01-02|2019-01-02|2019|      1| 5836.435058| 2019-09-11|
| 989856667|  2019-01-02|2019-01-01|2019|      1| 5834.435058| 2019-09-11|
| 989856662|  2019-01-01|2019-01-01|2019|      1| 3800.435058| 2019-09-11|
| 989856662|  2019-01-01|2018-12-31|2018|      4| 3700.435058| 2019-09-11|
| 989856665|  2019-01-01|2019-01-01|2019|      1| 3834.435058| 2019-09-11|
| 989856665|  2019-01-01|2018-12-31|2018|      4| 3900.435058| 2019-09-11|
| 989856667|  2019-01-01|2019-01-01|2019|      1| 5834.435058| 2019-09-11|
| 989856667|  2019-01-01|2018-12-31|2018|      4| 5833.435058| 2019-09-11|
| 989856662|  2018-12-31|2018-12-31|2018|      4| 3700.435058| 2019-09-11|
| 989856662|  2018-12-31|2018-12-30|2018|      4| 3832.435058| 2019-09-11|
| 989856665|  2018-12-31|2018-12-31|2018|      4| 3900.435058| 2019-09-11|
| 989856665|  2018-12-31|2018-12-30|2018|      4| 3700.435058| 2019-09-11|
| 989856667|  2018-12-31|2018-12-31|2018|      4| 5833.435058| 2019-09-11|
| 989856667|  2018-12-31|2018-12-30|2018|      4| 5832.435058| 2019-09-11|
| 989856662|  2018-12-30|2018-12-30|2018|      4| 3832.435058| 2019-09-11|
| 989856665|  2018-12-30|2018-12-30|2018|      4| 3700.435058| 2019-09-11|
| 989856667|  2018-12-30|2018-12-30|2018|      4| 5832.435058| 2019-09-11|
+----------+------------+----------+----+-------+------------+-----------+

how to achieve above output ?

以下是所附笔记本网址。

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1165111237342523/988191344931748/7035720262824085/latest.html

【问题讨论】:

    标签: dataframe apache-spark apache-spark-sql


    【解决方案1】:

    为了获得给定gen_datecompany_id 的前一天详细信息,您可以像下面的spec 一样使用滞后函数,

    val windowSpec  = Window.partitionBy("company_id").orderBy("gen_date") 
    
    val intermediateDF = finDF
      .withColumn("previous_gen_date", lag("gen_date",1).over(windowSpec))
    

    上述步骤将根据company_id和gen_date为您获取上一代日期,您可以将此数据与您的原始数据结合以获得相关的前一天数据。

    val finalDF = intermediateDF.alias("a")
      .join(finDF.alias("b"), col("a.company_id") === col("b.company_id") &&
        col("a.previous_gen_date") === col("b.gen_date"), "left_outer")
        .select(col("a.*"),
          col("b.year").as("previous_gen_date_year"),
          col("b.quarter").as("previous_gen_date_quarter"),
          col("b.total_assets").as("previous_gen_date_total_assets"),
          col("b.create_date").as("previous_gen_date_create_date")
        )
    

    上述连接将产生前一天的完整数据以及生成日期。

    +----------+----------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+------------------------------+-----------------------------+
    |company_id|gen_date  |year|quarter|total_assets|create_date|previous_gen_date|previous_gen_date_year|previous_gen_date_quarter|previous_gen_date_total_assets|previous_gen_date_create_date|
    +----------+----------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+------------------------------+-----------------------------+
    |989856662 |2018-12-30|2018|4      |3832.435058 |2019-09-11 |null             |null                  |null                     |null                          |null                         |
    |989856662 |2018-12-31|2018|4      |3700.435058 |2019-09-11 |2018-12-30       |2018                  |4                        |3832.435058                   |2019-09-11                   |
    |989856662 |2019-01-01|2019|1      |3800.435058 |2019-09-11 |2018-12-31       |2018                  |4                        |3700.435058                   |2019-09-11                   |
    |989856662 |2019-01-02|2019|1      |3900.435058 |2019-09-11 |2019-01-01       |2019                  |1                        |3800.435058                   |2019-09-11                   |
    +----------+----------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+------------------------------+-----------------------------+
    

    在这里,您的gen_date 也可以充当process_date 列,您可以借此比较任何操作的两天数据。

    【讨论】:

    • 感谢 Shiva ,不确定您是否正确理解了我的要求。对于每天的每条记录,即除了 process_date 之外什么都没有,我们应该有两条两条记录,即当天记录 + 前一天记录....因此近似输出应该包含 2x 记录。我希望你有要求,否则请告诉我。
    猜你喜欢
    • 2023-03-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-10-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多