【问题标题】:How to add strings of one columns of the dataframe and form another column that will have the incremental value of the original column如何添加数据框一列的字符串并形成另一列,该列将具有原始列的增量值
【发布时间】:2018-10-07 10:03:42
【问题描述】:

我有一个 DataFrame,我将其数据粘贴在下面:

+---------------+--------------+----------+------------+----------+
|name           |      DateTime|       Seq|sessionCount|row_number|
+---------------+--------------+----------+------------+----------+
|            abc| 1521572913344|        17|           5|         1|
|            xyz| 1521572916109|        17|           5|         2|
|           rafa| 1521572916118|        17|           5|         3|
|             {}| 1521572916129|        17|           5|         4|
|     experience| 1521572917816|        17|           5|         5|
+---------------+--------------+----------+------------+----------+

'name' 列是字符串类型。我想要一个新列"effective_name",它将包含"name" 的增量值,如下所示:

+---------------+--------------+----------+------------+----------+-------------------------+
|name          | DateTime |sessionSeq|sessionCount|row_number |effective_name|
+---------------+--------------+----------+------------+----------+-------------------------+
|abc            |1521572913344 |17        |5           |1         |abc                      |
|xyz            |1521572916109 |17        |5           |2         |abcxyz                   |
|rafa           |1521572916118 |17        |5           |3         |abcxyzrafa               |
|{}             |1521572916129 |17        |5           |4         |abcxyzrafa{}             |
|experience     |1521572917816 |17        |5           |5         |abcxyzrafa{}experience   |
+---------------+--------------+----------+------------+----------+-------------------------+

新列包含 name 列的先前值的增量串联。

【问题讨论】:

  • 您是通过clientDateTime 还是row_number 订购的?有groupBy()s 吗?
  • @Chaitanya 我回滚了你的编辑。不要发帖pictures of code or data
  • 到目前为止你做了什么?
  • @pault- 数据是虚拟的
  • @AshishAcharya 我正在尝试使用滞后函数进行连接

标签: python apache-spark dataframe pyspark


【解决方案1】:

您可以通过使用pyspark.sql.Window 来实现此目的,它按clientDateTimepyspark.sql.functions.concat_wspyspark.sql.functions.collect_list 排序:

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.orderBy("DateTime")  # define Window for ordering

df.drop("Seq", "sessionCount", "row_number").select(
    "*",
    f.concat_ws(
        "",
        f.collect_list(f.col("name")).over(w)
    ).alias("effective_name")
).show(truncate=False)
#+---------------+--------------+-------------------------+
#|name           |      DateTime|effective_name           |
#+---------------+--------------+-------------------------+
#|abc            |1521572913344 |abc                      |
#|xyz            |1521572916109 |abcxyz                   |
#|rafa           |1521572916118 |abcxyzrafa               |
#|{}             |1521572916129 |abcxyzrafa{}             |
#|experience     |1521572917816 |abcxyzrafa{}experience   |
#+---------------+--------------+-------------------------+

我删除了"Seq""sessionCount""row_number" 以使输出显示更友好。

如果您需要按组执行此操作,可以将 partitionBy 添加到 Window。假设在这种情况下您想按sessionSeq 分组,您可以执行以下操作:

w = Window.partitionBy("Seq").orderBy("DateTime")

df.drop("sessionCount", "row_number").select(
    "*",
    f.concat_ws(
        "",
        f.collect_list(f.col("name")).over(w)
    ).alias("effective_name")
).show(truncate=False)
#+---------------+--------------+----------+-------------------------+
#|name           |      DateTime|sessionSeq|effective_name           |
#+---------------+--------------+----------+-------------------------+
#|abc            |1521572913344 |17        |abc                      |
#|xyz            |1521572916109 |17        |abcxyz                   |
#|rafa           |1521572916118 |17        |abcxyzrafa               |
#|{}             |1521572916129 |17        |abcxyzrafa{}             |
#|experience     |1521572917816 |17        |abcxyzrafa{}experience   |
#+---------------+--------------+----------+-------------------------+

如果你更喜欢使用withColumn,上面的等价于:

df.drop("sessionCount", "row_number").withColumn(
    "effective_name",
    f.concat_ws(
        "",
        f.collect_list(f.col("name")).over(w)
    )
).show(truncate=False)

说明

您希望将一个函数应用于多行,这称为聚合。对于任何聚合,您需要定义要聚合的行和顺序。我们使用Window 执行此操作。在这种情况下,w = Window.partitionBy("Seq").orderBy("DateTime") 将按Seq 对数据进行分区并按DateTime 排序。

我们首先在窗口上应用聚合函数collect_list("name")。这会收集来自name 列的所有值并将它们放入一个列表中。插入的顺序由窗口的顺序定义。

例如,这一步的中间输出是:

df.select(
    f.collect_list("name").over(w).alias("collected")
).show()
#+--------------------------------+
#|collected                       |
#+--------------------------------+
#|[abc]                           |
#|[abc, xyz]                      |
#|[abc, xyz, rafa]                |
#|[abc, xyz, rafa, {}]            |
#|[abc, xyz, rafa, {}, experience]|
#+--------------------------------+

现在适当的值在列表中,我们可以将它们连接在一起,并用一个空字符串作为分隔符。

df.select(
    f.concat_ws(
        "",
        f.collect_list("name").over(w)
    ).alias("concatenated")
).show()
#+-----------------------+
#|concatenated           |
#+-----------------------+
#|abc                    |
#|abcxyz                 |
#|abcxyzrafa             |
#|abcxyzrafa{}           |
#|abcxyzrafa{}experience |
#+-----------------------+

【讨论】:

  • 感谢 Pault ...sessionSeq 在我将编写的最终数据帧中是必需的。我将删除其他两列。我可以在窗口选项中使用 PartitionBy("sessionSeq"),因为我只需要特定的 sessionSeq
  • @ChaitanyaKirty 是的,你可以这样做。我编辑了答案以显示一个示例。
  • 也。我需要 dataframe 中的新列“effective_feature_toggles”。我不应该使用 df.withColumn() 选项吗?
  • 您可以选择 - df = df.select("*", ...)df = df.withColumn(...)。我将添加一个如何使用withColumn 的示例。在这两种情况下,请确保最后没有 .show() - 这只是为了显示并返回 None
  • 年度被低估的答案
【解决方案2】:

解决方案:

将 pyspark.sql.functions 导入为 f

w = Window.partitionBy("Seq").orderBy("DateTime")

df.select( "*", f.concat_ws( "", f.collect_set(f.col("name")).over(w) ).alias("cummuliative_name") ).show()

说明

collect_set() - 这个函数返回类似 [["abc","xyz","rafa",{},"experience"]] 的值。

concat_ws() - 该函数将 collect_set() 的输出作为输入,并将其转换为 abc、xyz、rafa、{}、experience

注意: 如果您没有重复项,请使用 collect_set(),否则使用 collect_list()

【讨论】:

    猜你喜欢
    • 2021-04-18
    • 2016-12-01
    • 2022-08-21
    • 1970-01-01
    • 1970-01-01
    • 2021-12-02
    • 1970-01-01
    • 2021-11-30
    • 1970-01-01
    相关资源
    最近更新 更多