【问题标题】:Do PySpark DataFrames have a "pipe" function like in Pandas?PySpark DataFrames 是否具有像 Pandas 中的“管道”功能?
【发布时间】:2020-08-08 00:40:40
【问题描述】:

例如在 Pandas 中我会这样做

data_df = (
     pd.DataFrame(dict(col1=['a', 'b', 'c'], col2=['1', '2', '3']))
     .pipe(lambda df: df[df.col1 != 'a'])
 )   

这类似于R的管道%>%

PySpark 中有类似的东西吗?

【问题讨论】:

  • 我不这么认为。至少,这不是一个很好的使用方法。在 PySpark 中,您的 DataFrame 分布在多个服务器上。如果有一种行为类似于 Panda 的管道的方法,它需要将来自所有服务器的数据收集到一个单独的服务器中,然后调用 lambda 函数。为什么要使用pipe?你想创建一个新列吗?添加新的?对行/组/整个数据框进行转换或聚合?
  • @MkWTF 此时我真的只想重命名列。就是这样。
  • 你有多种方法可以做到这一点,check this site,学习 spark 真的很好。我也留下pyspark docs here,以备不时之需。
  • @MkWTF 我有几百列,需要在循环中重命名它们,所以在 pandas 中使用pipe(standardize_col_names) 之类的东西是一个很好的情况。我的主要问题是关于@someshwar-kale 回答的pipe。管道是 Spark 中对应的东西

标签: python pandas pyspark


【解决方案1】:

您可以定义一个“类似pandas”的pipe 方法并将其绑定到DataFrame 类:

from pyspark.sql import DataFrame

def pipe(self, func, *args, **kwargs):
    return func(self, *args, **kwargs)

DataFrame.pipe = pipe 

然后,您可以将函数传递给pipe 方法以应用于pyspark DataFrame。例如,假设您想在更改其列之后从 DataFrame my_df 中选择所有列,除了最后两列。您可以为此使用pipe

my_new_df = (
    my_df
    # Perform some operations to add and/or remove columns
    ... 
    # At this point the list of columns is different 
    # from `my_df.columns`
    .pipe(lambda df: df.select(*df.columns[:-2]))
)

【讨论】:

    【解决方案2】:

    我认为,在pyspark 中,您可以借助pipeline 轻松实现此管道功能。

    1. 将每个管道函数转换为转换器。 spark提供了一些预定义的转换器,我们也可以使用它
    2. 使用转换器创建管道
    3. 运行管道以转换提供的数据帧

    Example: Let's take the example you provided

    输入要转换的数据框

     val df = Seq(("a", 1), ("b", 2), ("c", 3)).toDF("col1", "col2")
        df.show(false)
        df.printSchema()
        /**
          * +----+----+
          * |col1|col2|
          * +----+----+
          * |a   |1   |
          * |b   |2   |
          * |c   |3   |
          * +----+----+
          *
          * root
          * |-- col1: string (nullable = true)
          * |-- col2: integer (nullable = false)
          */
    

    1。将每个管道函数转换为转换器

    对于.pipe(lambda df: df[df.col1 != 'a']),我们可以轻松使用spark SQLTransformer。所以不需要创建自定义转换器

    2。使用转换器创建管道

     val transform1 = new SQLTransformer()
          .setStatement("select * from __THIS__ where col1 != 'a'")
        val transform2 = new SQLTransformer()
          .setStatement("select col1, col2, SQRT(col2) as col3 from __THIS__")
    
        val pipeline = new Pipeline()
          .setStages(Array(transform1, transform2))
    

    3。运行管道以转换提供的数据帧

    pipeline.fit(df).transform(df)
          .show(false)
    
        /**
          * +----+----+------------------+
          * |col1|col2|col3              |
          * +----+----+------------------+
          * |b   |2   |1.4142135623730951|
          * |c   |3   |1.7320508075688772|
          * +----+----+------------------+
          */
    

    【讨论】:

    • @0111001101110000 你检查过这个吗?
    • 这让我相信管道是与 pandas 管道功能最好的并行。我想我可以为我的转换制作自己的转换器,但不认为这会增加我的代码的可读性,这是管道函数的要点。
    【解决方案3】:

    在 PySpark 中,管道函数称为转换,文档为 here

    行为与 Pandas 管道运算符相同。

    所以 PySpark 中的示例看起来像

    data_df = (
      spark.createDataFrame(pd.DataFrame(dict(col1=['a', 'b', 'c'], col2=['1', '2', '3'])))
      .transform(lambda df: df.filter("col1 != 'a'"))
    )
    

    【讨论】:

    • 请注意transform 是在 Spark 3.0 中实现的
    • 这是currently working linkDataFrame.transform 文档。
    • pyspark.sql.DataFrame.transform 只接受并返回一个 Dataframe,而使用来自 @luiz-otavio-v-b-oliveira 的管道函数扩展 Dataframe 也可以采用任意参数。
    猜你喜欢
    • 2022-08-12
    • 2016-12-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-14
    • 1970-01-01
    相关资源
    最近更新 更多