【问题标题】:Pyspark : Split dataframe based on contents and extract date from bottom line of splitPyspark:根据内容拆分数据框并从拆分的底线提取日期
【发布时间】:2022-01-13 00:29:18
【问题描述】:

我正在将旧文件读入 Dataframe,它看起来像下面这样;

+-----------+----------+----------+--------+--------+--------+
|c1         |      c2  | c3       |    c4  |    c5  |    c6  |
+-----------+----------+----------+--------+--------+--------+
| 01        |  B01     |null      |null    |file1   |B01-01  |
| 06        |  B01     |foo       |bar     |file1   |B01-02  |
| 06        |  B01     |foo       |bar     |file1   |B01-03  |
| 09        |  B01     |2021-12-07|null    |file1   |B01-04  |
| 01        |  B02     |null      |null    |file2   |B02-01  |
| 09        |  B02     |2021-12-07|null    |file2   |B02-02  |
| 01        |  B03     |null      |null    |file3   |B03-01  |
| 06        |  B03     |foo       |bar     |file3   |B03-02  |
| 06        |  B03     |foo       |bar     |file3   |B03-03  |
| 09        |  B03     |2021-12-07|null    |file3   |B03-04  |
| 01        |  B01     |null      |null    |file4   |B01-01  |
| 06        |  B01     |foo       |bar     |file4   |B01-02  |
| 06        |  B01     |foo       |bar     |file4   |B01-03  |
| 09        |  B01     |2021-12-06|null    |file4   |B01-04  |
+-----------+----------+----------+--------+--------+--------+

一个物理文件包含多个逻辑文件,并且有一个 header (01)、detail_rec (06) 和 trail (09)(有时只有 header 和 trail)

我想从预告片中获取每个逻辑分隔的日期,并将其作为一列添加到该记录块中,如下所示。

+-----------+----------+----------+--------+--------+--------+----------+
|c1         |      c2  | c3       |    c4  |    c5  |    c6  | c7       |
+-----------+----------+----------+--------+--------+--------+----------+
| 01        |  B01     |null      |null    |file1   |B01-01  |2021-12-07|
| 06        |  B01     |foo       |bar     |file1   |B01-02  |2021-12-07|
| 06        |  B01     |foo       |bar     |file1   |B01-03  |2021-12-07|
| 09        |  B01     |2021-12-07|null    |file1   |B01-04  |2021-12-07|
| 01        |  B02     |null      |null    |file2   |B02-01  |2021-12-05|
| 09        |  B02     |2021-12-05|null    |file2   |B02-02  |2021-12-05|
| 01        |  B03     |null      |null    |file3   |B03-01  |2021-12-07|
| 06        |  B03     |foo       |bar     |file3   |B03-02  |2021-12-07|
| 06        |  B03     |foo       |bar     |file3   |B03-03  |2021-12-07|
| 09        |  B03     |2021-12-07|null    |file3   |B03-04  |2021-12-07|
| 01        |  B01     |null      |null    |file4   |B01-01  |2021-12-06|
| 06        |  B01     |foo       |bar     |file4   |B01-02  |2021-12-06|
| 06        |  B01     |foo       |bar     |file4   |B01-03  |2021-12-06|
| 09        |  B01     |2021-12-06|null    |file4   |B01-04  |2021-12-06|
+-----------+----------+----------+--------+--------+--------+----------+

我尝试使用 Window 功能使用 unboundedPreceding 和 unboundedFollowing 提取 rowsBetween,但无法到达任何地方。

【问题讨论】:

  • 如何在数据框中唯一标识行?
  • 不幸的是,它不是!
  • 如果我有,它将如何工作?
  • 如果每个逻辑文件都可以通过唯一标识符识别,那么您可以应用带有标识符的自联接并从联接右侧的 df 中选择 c3,这样 c1 就是 @ 987654325@。如果您可以将带有列的问题编辑为唯一的逻辑文件,我可以提供一个工作示例。
  • 添加了一个独特的列,谢谢

标签: apache-spark pyspark apache-spark-sql


【解决方案1】:

您可以从原始df中过滤trailer记录,然后将c3列重命名为c7。最后在文件名列c5上加入原始数据框和过滤后的数据框。


from pyspark.sql import functions as F

data = [("01", "B01", None, None,"file1", "B01-01"),
("06", "B01", "foo", "bar" ,"file1", "B01-02"),
("06", "B01", "foo", "bar" ,"file1", "B01-03"),
("09", "B01", "2021-12-07", None,"file1", "B01-04"),
("01", "B02", None, None,"file2", "B02-01"),
("09", "B02", "2021-12-05", None,"file2", "B02-02"),
("01", "B03", None, None,"file3", "B03-01"),
("06", "B03", "foo", "bar" ,"file3", "B03-02"),
("06", "B03", "foo", "bar" ,"file3", "B03-03"),
("09", "B03", "2021-12-07", None,"file3", "B03-04"),
("01", "B01", None, None,"file4", "B01-01"),
("06", "B01", "foo", "bar" ,"file4", "B01-02"),
("06", "B01", "foo", "bar" ,"file4", "B01-03"),
("09", "B01", "2021-12-06", None,"file4", "B01-04"),]

df = spark.createDataFrame(data, ("c1", "c2", "c3", "c4", "c5", "c6", )) 

df_trailer = df.selectExpr("c5", "c3 as c7").filter(F.col("c1") == "09")

df.join(df_trailer, ["c5"]).show()

输出

+-----+---+---+----------+----+------+----------+
|   c5| c1| c2|        c3|  c4|    c6|        c7|
+-----+---+---+----------+----+------+----------+
|file1| 01|B01|      null|null|B01-01|2021-12-07|
|file1| 06|B01|       foo| bar|B01-02|2021-12-07|
|file1| 06|B01|       foo| bar|B01-03|2021-12-07|
|file1| 09|B01|2021-12-07|null|B01-04|2021-12-07|
|file2| 01|B02|      null|null|B02-01|2021-12-05|
|file2| 09|B02|2021-12-05|null|B02-02|2021-12-05|
|file3| 01|B03|      null|null|B03-01|2021-12-07|
|file3| 06|B03|       foo| bar|B03-02|2021-12-07|
|file3| 06|B03|       foo| bar|B03-03|2021-12-07|
|file3| 09|B03|2021-12-07|null|B03-04|2021-12-07|
|file4| 01|B01|      null|null|B01-01|2021-12-06|
|file4| 06|B01|       foo| bar|B01-02|2021-12-06|
|file4| 06|B01|       foo| bar|B01-03|2021-12-06|
|file4| 09|B01|2021-12-06|null|B01-04|2021-12-06|
+-----+---+---+----------+----+------+----------+

【讨论】:

  • 感谢您抽出宝贵时间帮助我。欣赏!
【解决方案2】:

我可以使用 Window 解决这个问题

from pyspark.sql import functions as sf
from pyspark.sql.window import Window

w = Window.partitionBy('c5').orderBy('c1').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
new_df = df.withColumn('c7', sf.last('c3').over(w))

根据 c5 创建组,然后选择 c3 的最后一个值。将其添加为新列 c7

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-02-24
    • 1970-01-01
    • 1970-01-01
    • 2016-09-28
    • 2021-10-04
    • 2016-06-24
    • 2022-10-18
    • 2021-09-16
    相关资源
    最近更新 更多