【问题标题】:Transpose column to row with Spark使用 Spark 将列转置为行
【发布时间】:2016-10-18 06:25:40
【问题描述】:

我正在尝试将表格的某些列转换为行。 我正在使用 Python 和 Spark 1.5.0。这是我的初始表格:

+-----+-----+-----+-------+
|  A  |col_1|col_2|col_...|
+-----+-------------------+
|  1  |  0.0|  0.6|  ...  |
|  2  |  0.6|  0.7|  ...  |
|  3  |  0.5|  0.9|  ...  |
|  ...|  ...|  ...|  ...  |

我想要这样的东西:

+-----+--------+-----------+
|  A  | col_id | col_value |
+-----+--------+-----------+
|  1  |   col_1|        0.0|
|  1  |   col_2|        0.6|   
|  ...|     ...|        ...|    
|  2  |   col_1|        0.6|
|  2  |   col_2|        0.7| 
|  ...|     ...|        ...|  
|  3  |   col_1|        0.5|
|  3  |   col_2|        0.9|
|  ...|     ...|        ...|

有人知道我能做到吗?感谢您的帮助。

【问题讨论】:

标签: python apache-spark pivot transpose


【解决方案1】:

使用基本的 Spark SQL 函数相对简单。

Python

from pyspark.sql.functions import array, col, explode, struct, lit

df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])

def to_long(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

to_long(df, ["A"])

斯卡拉

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, col, explode, lit, struct}

val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")

def toLong(df: DataFrame, by: Seq[String]): DataFrame = {
  val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip
  require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")      

  val kvs = explode(array(
    cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*
  ))

  val byExprs = by.map(col(_))

  df
    .select(byExprs :+ kvs.alias("_kvs"): _*)
    .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)
}

toLong(df, Seq("A"))

【讨论】:

  • 我不认为这“相对”简单:)
  • 我收到错误AssertionError: All columns have to be of the same type
  • 如何用 Java 写这个?
  • 如何反其道而行之。如何从第二个数据帧制作第一个数据帧?
  • @ShekharKoirala 这是因为你的数据框中的列是不同的数据类型,在函数代码中明确提到了。
【解决方案2】:

使用函数create_mapexplode 解决pyspark sql 的一种方法。

from pyspark.sql import functions as func
#Use `create_map` to create the map of columns with constant 
df = df.withColumn('mapCol', \
                    func.create_map(func.lit('col_1'),df.col_1,
                                    func.lit('col_2'),df.col_2,
                                    func.lit('col_3'),df.col_3
                                   ) 
                  )
#Use explode function to explode the map 
res = df.select('*',func.explode(df.mapCol).alias('col_id','col_value'))
res.show()

【讨论】:

【解决方案3】:

Spark 局部线性代数库目前非常薄弱:它们不包括上述基本操作。

有一个 JIRA 可以为 Spark 2.1 解决此问题 - 但这对您今天没有帮助。

需要考虑的一点:执行转置可能需要完全打乱数据。

现在您需要直接编写 RDD 代码。我在 scala 中写过 transpose - 但不是在 python 中。这是scala 版本:

 def transpose(mat: DMatrix) = {
    val nCols = mat(0).length
    val matT = mat
      .flatten
      .zipWithIndex
      .groupBy {
      _._2 % nCols
    }
      .toSeq.sortBy {
      _._1
    }
      .map(_._2)
      .map(_.map(_._1))
      .toArray
    matT
  }

因此您可以将其转换为 python 以供您使用。在这个特定时刻,我没有足够的带宽来编写/测试它:如果您无法进行该转换,请告诉我。

至少 - 以下内容很容易转换为python

  • zipWithIndex --> enumerate()(python 等效项 - 归功于 @zero323)
  • map --> [someOperation(x) for x in ..]
  • groupBy --> itertools.groupBy()

这是flatten 的实现,它没有等效的python:

  def flatten(L):
        for item in L:
            try:
                for i in flatten(item):
                    yield i
            except TypeError:
                yield item

因此,您应该能够将它们放在一起以获得解决方案。

【讨论】:

  • 感谢您的回答。我不知道 scala,但我会尝试理解您的代码。我会及时通知您。
  • @Raouf 上面的代码在 python 中都有等价物。如果你很了解python,应该不会有问题。我展示了flatten,这是python中唯一缺少的。让我知道;)
  • zipWithIndex --> enumerate()(Python 等效项)?
  • @zero323 好眼睛!顺便说一句,我要为你的好答案投票。
  • 谢谢。它稍微有点冗长,但不会移动太多数据。
【解决方案4】:

您可以使用 stack 函数:

例如:

df.selectExpr("stack(2, 'col_1', col_1, 'col_2', col_2) as (key, value)")

地点:

  • 2 是要堆叠的列数(col_1 和 col_2)
  • 'col_1' 是键的字符串
  • col_1 是从中获取值的列

如果您有多个列,您可以构建整个 stack 字符串迭代列名并将其传递给 selectExpr

【讨论】:

  • df.selectExpr('column_names_to_keep', 'column_names_to_keep', "stack(2, 'col_1', col_1, 'col_2', col_2) as (key, value)")
  • 你能看看这里吗? stackoverflow.com/questions/67374048/… 我想我因为列名而面临一个问题
  • 我正在使用这个函数,但是遇到了不同数据类型的列。 IE。有些是字符串,有些是十进制。如何使用堆栈将十进制类型转换为字符串?
【解决方案5】:

使用平面图。像下面这样的东西应该可以工作

from pyspark.sql import Row

def rowExpander(row):
    rowDict = row.asDict()
    valA = rowDict.pop('A')
    for k in rowDict:
        yield Row(**{'A': valA , 'colID': k, 'colValue': row[k]})

newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander))

【讨论】:

  • 感谢您的回答。但它不起作用。这是我收到的错误消息:TypeError: tuple indices must be integers, not str
【解决方案6】:

我采用了 @javadba 编写的 Scala 答案,并创建了一个 Python 版本,用于转置 DataFrame 中的所有列。这可能与 OP 所要求的有点不同......

from itertools import chain
from pyspark.sql import DataFrame


def _sort_transpose_tuple(tup):
    x, y = tup
    return x, tuple(zip(*sorted(y, key=lambda v_k: v_k[1], reverse=False)))[0]


def transpose(X):
    """Transpose a PySpark DataFrame.

    Parameters
    ----------
    X : PySpark ``DataFrame``
        The ``DataFrame`` that should be tranposed.
    """
    # validate
    if not isinstance(X, DataFrame):
        raise TypeError('X should be a DataFrame, not a %s' 
                        % type(X))

    cols = X.columns
    n_features = len(cols)

    # Sorry for this unreadability...
    return X.rdd.flatMap( # make into an RDD
        lambda xs: chain(xs)).zipWithIndex().groupBy( # zip index
        lambda val_idx: val_idx[1] % n_features).sortBy( # group by index % n_features as key
        lambda grp_res: grp_res[0]).map( # sort by index % n_features key
        lambda grp_res: _sort_transpose_tuple(grp_res)).map( # maintain order
        lambda key_col: key_col[1]).toDF() # return to DF

例如:

>>> X = sc.parallelize([(1,2,3), (4,5,6), (7,8,9)]).toDF()
>>> X.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  7|  8|  9|
+---+---+---+

>>> transpose(X).show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
|  1|  4|  7|
|  2|  5|  8|
|  3|  6|  9|
+---+---+---+

【讨论】:

【解决方案7】:

一种非常方便的实现方式:

from pyspark.sql import Row

def rowExpander(row):
    rowDict = row.asDict()
    valA = rowDict.pop('A')
    for k in rowDict:
        yield Row(**{'A': valA , 'colID' : k, 'colValue' : row[k]})

    newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander)

【讨论】:

    【解决方案8】:

    为了在pySpark 中转置Dataframe,我在临时创建的列上使用pivot,我在操作结束时删除了该列。

    说,我们有一张这样的桌子。我们要做的是找到每个listed_days_bin 值的所有用户。

    +------------------+-------------+
    |  listed_days_bin | users_count | 
    +------------------+-------------+
    |1                 |            5| 
    |0                 |            2|
    |0                 |            1| 
    |1                 |            3|  
    |1                 |            4| 
    |2                 |            5| 
    |2                 |            7|  
    |2                 |            2|  
    |1                 |            1|
    +------------------+-------------+
    

    创建新的临时列 - 'pvt_value',对其进行聚合并透视结果

    import pyspark.sql.functions as F
    
    
    agg_df = df.withColumn('pvt_value', lit(1))\
            .groupby('pvt_value')\
            .pivot('listed_days_bin')\
            .agg(F.sum('users_count')).drop('pvt_value')
    

    新数据框应如下所示:

    +----+---+---+
    |  0 | 1 | 2 | # Columns 
    +----+---+---+
    |   3| 13| 14| # Users over the bin
    +----+---+---+
    

    【讨论】:

    猜你喜欢
    • 2019-02-20
    • 2021-11-27
    • 1970-01-01
    • 2015-09-08
    • 1970-01-01
    • 2019-02-22
    • 2023-03-27
    • 2021-09-25
    • 2020-10-10
    相关资源
    最近更新 更多