【问题标题】:Pivot a Pyspark DataFrame to get a MultiColumn旋转 Pyspark DataFrame 以获取 MultiColumn
【发布时间】:2021-03-12 03:51:57
【问题描述】:

我的 Pyspark 数据框如下所示:

+--------+----------+----+----+----+
|latitude| longitude|var1|date|var2|
+--------+----------+----+----+----+
|    3.45|     -8.65|   1|   7|   2|
|   30.45|     45.65|   1|   7|   2|
|   40.45|    123.65|   1|   7|   2|
|   43.45|     13.65|   1|   7|   2|
|   44.45|    -12.65|   1|   7|   2|
|   54.45|   -128.65|   1|   7|   2|
+--------+----------+----+----+----+

但我不知道如何重塑它以仅获取每个日期的寄存器和按该顺序指定 [变量、纬度、经度] 的多列,因此我可以在单独的列中处理变量、纬度和经度的每种组合.

制作这个:

df.select(
    'date',
    *[F.array(F.col(col), F.col('latitude'), F.col('longitude')) for col in var_cols]
).show()

我明白了:

+----+---------------------------------+---------------------------------+
|date|array(var1, latitude, longitude) |array(var2, latitude, longitude) |
+----+---------------------------------+---------------------------------+
|   7|               [1.0, 3.45, -8.65]|               [2.0, 3.45, -8.65]|
|   7|              [1.0, 30.45, 45.65]|              [2.0, 30.45, 45.65]|
|   7|             [1.0, 40.45, 123.65]|             [2.0, 40.45, 123.65]|
|   7|              [1.0, 43.45, 13.65]|              [2.0, 43.45, 13.65]|
|   7|             [1.0, 44.45, -12.65]|             [2.0, 44.45, -12.65]|
|   7|             [1.0, 54.45, -128...|             [2.0, 54.45, -128...|
+----+---------------------------------+---------------------------------+

我想要一个具有单个值(var 的值)的列和一个由纬度和经度的每个值组成的列。想象一下在 pandas 中创建一个 [日期、纬度、经度] 的索引,然后将纬度和经度列拆开。

例如,在熊猫中我会这样做:

df.set_index(["date", "latitude", "longitude"]).unstack().unstack()

【问题讨论】:

  • 你想如何处理这两个变量? [var1, lat, long], [var2,lat,long] 还是 [var1, var2, lat, long]?
  • [var1, lat, long], [var2,lat,long] 方式@mck
  • 所以你想要 3 列,日期,[v1,l,l],[v2,l,l]?
  • 就是这样@mck。

标签: python apache-spark pyspark apache-spark-sql


【解决方案1】:

这个怎么样:

var_cols = [col for col in df.columns if col not in ['date', 'latitude', 'longitude']]

df.withColumn('latlong',
              F.concat_ws('_', F.col('latitude'), F.col('longitude'))) \
  .groupBy('date') \
  .pivot('latlong') \
  .agg(*[F.first(col) for col in var_cols])

【讨论】:

  • 在“latlong”上旋转返回错误:Py4JJavaError:调用 o955.pivot 时发生错误。 : java.lang.RuntimeException: org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78) 上不支持的文字类型类 scala.collection.mutable.WrappedArray$ofRef WrappedArray(3.45, -8.65) ) 在 org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419) 在 org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419) )
  • 您使用的是什么版本的 spark?这适用于 Spark 3.0.0
  • 好像是 issues.apache.org/jira/browse/SPARK-26403 。我猜你正在使用 Spark 2
  • 就是这样。我正在使用 Spark 2
【解决方案2】:

我遇到了这个解决方案:

var_cols = [col for col in df.columns if col not in ['date', 'latitude', 'longitude']]

df = df.withColumn('latlong',F.array(F.col('latitude'), F.col('longitude')))

df = df.withColumn('latlong', F.concat_ws(',', 'latlong'))
df = df.groupBy(["date"]).pivot("latlong").max(*var_cols)

【讨论】:

  • @mck 你能发现这个解决方案有什么问题吗?日期、纬度和经度的组合应该是唯一的,因此 max 聚合函数应该可以正常工作。效率方面有没有提升?
  • 对我来说看起来不错,但是这两行可以合并为一行。我会更新我的答案
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-12-18
相关资源
最近更新 更多