【问题标题】:How to pivot a pyspark streaming dataframe如何旋转 pyspark 流数据帧
【发布时间】:2019-12-06 20:19:08
【问题描述】:

我在 pyspark 结构化流中接收流数据,我需要对它们进行旋转,以便我可以从该数据中获得单行。

进入我的集群的数据结构是:

{
"version": 1.0.0,
"message": {
   "data": [{
    "name": "name_1", 
    "value": 1.0},
    ...
   {
    "name": "name_2", 
    "value": 2.0}]
 }
}

我的代码如下:

dfStreaming = spark \
  .readStream \
  .format("eventhubs") \
  .options(**optionConf()) \
  .load() \
  .select(explode("message.data").alias("data")) \
  .select(("data.*")) \

我得到以下结果数据框:

|---------------------|------------------|
|         Name        |       Value      |
|---------------------|------------------|
|        Name_1       |         1.0      |
|---------------------|------------------|
|        Name_2       |         2.0      |
|---------------------|------------------|

但我需要以下结构(它实际上是表格的一个支点):

|---------------------|------------------|
|        Name_1       |      Name_2      |
|---------------------|------------------|
|         1.0         |        2.0       |
|---------------------|------------------|

不允许对流数据帧进行支点,但我想应该有一个解决方案。

非常感谢您的帮助。

【问题讨论】:

    标签: pyspark pivot spark-streaming spark-structured-streaming pyspark-dataframes


    【解决方案1】:

    解决方案是添加几个聚合以及何时重新创建数据框的行。

    dfStreaming = spark \
      .readStream \
      .format("eventhubs") \
      .options(**optionConf()) \
      .load() \
      .select(explode("message.data").alias("data")) \
      .select(("data.*")) \
      .selectexpr(["sum(case when Name=Name_of_desired_column then Value else null) as Name_of_desired_column"])
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-01-04
      • 2021-03-14
      • 2021-05-21
      • 2018-05-15
      • 2020-07-31
      • 2021-03-25
      • 1970-01-01
      相关资源
      最近更新 更多