【问题标题】:Spark Scala input empty values according result from self joined dataframe querySpark Scala 根据自联接数据框查询的结果输入空值
【发布时间】:2020-10-06 21:13:27
【问题描述】:

我很难编写我的 spark scala 代码来使用带条件的自联接来填充覆盖范围为空的行。

这是数据:

+----+--------------+----------+--------+
| ID | date_in_days | coverage | values |
+----+--------------+----------+--------+
|  1 | 2020-09-01   |          | 0.128  |
|  1 | 2020-09-03   |        0 | 0.358  |
|  1 | 2020-09-04   |        0 | 0.035  |
|  1 | 2020-09-05   |          |        |
|  1 | 2020-09-06   |          |        |
|  1 | 2020-09-19   |          |        |
|  1 | 2020-09-12   |          |        |
|  1 | 2020-09-18   |          |        |
|  1 | 2020-09-11   |          |        |
|  1 | 2020-09-16   |          |        |
|  1 | 2020-09-21   |       13 | 0.554  |
|  1 | 2020-09-23   |          |        |
|  1 | 2020-09-30   |          |        |
+----+--------------+----------+--------+

预期结果:

+----+--------------+----------+--------+
| ID | date_in_day  | coverage | values |
+----+--------------+----------+--------+
|  1 | 2020-09-01   |       -1 | 0.128  |
|  1 | 2020-09-03   |        0 | 0.358  |
|  1 | 2020-09-04   |        0 | 0.035  |
|  1 | 2020-09-05   |        0 |        |
|  1 | 2020-09-06   |        0 |        |
|  1 | 2020-09-19   |        0 |        |
|  1 | 2020-09-12   |        0 |        |
|  1 | 2020-09-18   |        0 |        |
|  1 | 2020-09-11   |        0 |        |
|  1 | 2020-09-16   |        0 |        |
|  1 | 2020-09-21   |       13 | 0.554  |
|  1 | 2020-09-23   |       -1 |        |
|  1 | 2020-09-30   |       -1 |        |

我想做什么:

对于按日期排序的每个不同 ID(按 ID 分区的数据框)

用例:行覆盖列为空,我们称之为rowEmptycoverage

  1. 在 DF 中查找带有date_in_days > rowEmptycoverage.date_in_dayscoverage >= 0 的第一行。我们就叫它rowFirstDateGreater
  2. 如果rowFirstDateGreater.values > 500 设置rowEmptycoverage.coverage 为0。否则设置为-1。

当我加入 where 时,我有点迷失在混合中......

【问题讨论】:

  • 你能把逻辑解释清楚一点吗?欢迎提供说明所有案例的示例。
  • 是的,不清楚,我试图重新描述让我知道 :)

标签: scala apache-spark


【解决方案1】:

我假设您的意思是值 > 0.500 而不是值 > 500。逻辑仍然不清楚。这里我假设您是按列的顺序搜索date_in_days,而不是按数据帧的顺序。

在任何情况下,我们都可以改进解决方案以满足您的确切需求。总体思路是使用 Window 来获取覆盖范围不为空的下一个日期,检查 values 是否满足所需条件并更新 coverage

如下:

val win = Window.partitionBy("ID").orderBy("date_in_days")
    .rangeBetween(Window.currentRow, Window.unboundedFollowing)

df
  // creating a struct binding coverage and values
  .withColumn("cov_str", when('coverage isNull, lit(null))
                                    .otherwise(struct('coverage, 'values)))
  // finding the first row (starting from the current date, in order of 
  // date_in_days) for which the coverage is not null
  .withColumn("next_cov_str", first('cov_str, ignoreNulls=true) over win)
  // updating coverage. We keep the original value if not null, put 0 if values
  // meets the criteria (that you can change) and -1 otherwise.
  .withColumn("coverage", coalesce(
             'coverage,
             when($"next_cov_str.values" > 0.500, lit(0)),
             lit(-1)
  ))
  .show(false)
+---+-------------------+--------+------+-----------+------------+
|ID |date_in_days       |coverage|values|cov_str    |next_cov_str|
+---+-------------------+--------+------+-----------+------------+
|1  |2020-09-01 00:00:00|-1      |0.128 |null       |[0, 0.358]  |
|1  |2020-09-03 00:00:00|0       |0.358 |[0, 0.358] |[0, 0.358]  |
|1  |2020-09-04 00:00:00|0       |0.035 |[0, 0.035] |[0, 0.035]  |
|1  |2020-09-05 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-06 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-11 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-12 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-16 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-18 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-19 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-21 00:00:00|13      |0.554 |[13, 0.554]|[13, 0.554] |
|1  |2020-09-23 00:00:00|-1      |null  |null       |null        |
|1  |2020-09-30 00:00:00|-1      |null  |null       |null        |
+---+-------------------+--------+------+-----------+------------+

然后您可以使用drop("cov_str", "next_cov_str"),但为了清楚起见,我将它们留在这里。

【讨论】:

  • 所以这就是我所缺少的,窗户,我正在转身加入日期,我会试试这个非常感谢。
  • 没问题。如果这不是您想要的,请告诉我,我仍然不确定确切的逻辑;)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-06-16
  • 1970-01-01
相关资源
最近更新 更多