【问题标题】:How to create data frame from list in pyspark without using for loop?如何在不使用 for 循环的情况下从 pyspark 中的列表创建数据框?
【发布时间】:2018-06-04 21:22:16
【问题描述】:

我有如下列表:

rrr=[[(1,(3,1)),(2, (3,2)),(3, (3, 2)),(1,(4,1)),(2, (4,2))]]

df_input = []

接下来我定义了如下标题:

df_header=['sid', 'tid', 'srank']

使用 for 循环将数据附加到空列表中:

for i in rrr:
     for j in i:
            df_input.append((j[0], j[1][0], j[1][1]))
df_input

Output : [(1, 3, 1), (2, 3, 2), (3, 3, 2)]

如下创建数据框:

   df = spark.createDataFrame(df_input, df_header)
    df.show()

+---+---+------+
| sid|tid|srank|
+---+---+------+
|  1|  3|     1|
|  2|  3|     2|
|  3|  3|     2|
+---+---+------+

现在我的问题是如何在不使用任何外部 for 循环(如上)的情况下创建数据框。输入列表包含多于 1 条缺失记录。

【问题讨论】:

  • 从 CSV、JDBC 源、Hive 表、HDFS 上的 parquet/avro 文件中读取。
  • 如果问题是 for 循环,请查看列表推导...
  • 列表理解可能不起作用。我有一个大数据集,所以我不想使用列表来解决这个问题
  • 什么是“1条缺失记录”???
  • 我的原始列表包含多个缺少对象,即间接地我告诉我添加的任何列表对象仅用于示例。

标签: apache-spark pyspark apache-spark-sql spark-dataframe pyspark-sql


【解决方案1】:

当你意识到你的初始列表是一个嵌套的。即作为外部元素的唯一元素的实际列表,那么您会发现解决方案很容易通过仅考虑其第一个(也是唯一的)元素来实现:

spark.version
#  u'2.1.1'

from pyspark.sql import Row

# your exact data:
rrr=[[(1,(3,1)),(2, (3,2)),(3, (3, 2)),(1,(4,1)),(2, (4,2))]]
df_header=['sid', 'tid', 'srank']

df = sc.parallelize(rrr[0]).map(lambda x: Row(x[0], x[1][0],x[1][1])).toDF(schema=df_header)
df.show()
# +---+---+-----+ 
# |sid|tid|srank|
# +---+---+-----+
# |  1|  3|    1|
# |  2|  3|    2|
# |  3|  3|    2|
# |  1|  4|    1| 
# |  2|  4|    2|
# +---+---+-----+

【讨论】:

    【解决方案2】:

    解决方案一:引入 toDF() 转换(但修改输入)

    from pyspark.sql import Row    
    ar=[[1,(3,1)],[2, (3,2)],[3, (3,2)]]
    sc.parallelize(ar).map(lambda x: Row(sid=x[0], tid=x[1][0],srank=x[1][1])).toDF().show()
    
    +---+-----+---+
    |sid|srank|tid|
    +---+-----+---+
    |  1|    1|  3|
    |  2|    2|  3|
    |  3|    2|  3|
    +---+-----+---+
    

    解决方案 2:使用请求的输入矩阵使用列表理解、numpy 展平和重塑

    import numpy as np 
    x=[[(1,(3,1)),(2, (3,2)),(3, (3, 2))]]
    ar=[[(j[0],j[1][0],j[1][1]) for j in i] for i in x]
    flat=np.array(ar).flatten()
    flat=flat.reshape(len(flat)/3, 3)
    sc.parallelize(flat).map(lambda x: Row(sid=int(x[0]),tid=int(x[1]),srank=int(x[2]))).toDF().show()
    
    +---+-----+---+
    |sid|srank|tid|
    +---+-----+---+
    |  1|    1|  3|
    |  2|    2|  3|
    |  3|    2|  3|
    +---+-----+---+
    
    #works also with N,M matrix
    number_columns=3
    x=[[(1,(3,1)),(2, (3,2)),(3, (3, 2))],[(5,(6,7)),(8, (9,10)),(11, (12, 13))]]
    ar=[[(j[0],j[1][0],j[1][1]) for j in i] for i in x]
    flat=np.array(ar).flatten()
    flat=flat.reshape(int(len(flat)/number_columns), number_columns)
    sc.parallelize(flat).map(lambda x: Row(sid=int(x[0]),tid=int(x[1]),srank=int(x[2]))).toDF().show()
    +---+-----+---+
    |sid|srank|tid|
    +---+-----+---+
    |  1|    1|  3|
    |  2|    2|  3|
    |  3|    2|  3|
    |  5|    7|  6|
    |  8|   10|  9|
    | 11|   13| 12|
    +---+-----+---+
    

    【讨论】:

    • 请检查数据格式,"rrr=[[(1,(3,1)),(2, (3,2)),(3, (3, 2))]] " 不像 ar=[[1,(3,1)],[2, (3,2)],[3, (3,2)]]
    • @saikumar 在我持有的第二个例子中。但是我忘记了它的声明。我立即更新
    • 执行后出现错误">>> flat=flat.reshape(len(flat)/3, 3) Traceback(最近一次调用最后):文件“”,第 1 行, 在 TypeError: 'float' 对象不能被解释为整数"
    • 替换 int(len(flat)/3) 以删除错误...但是如果结构发生变化并且平面数组不能被 3 完全整除,这将无法正常工作。主要假设是输入的结构不会改变。
    • @saikumar(实际上是受访者):为什么要使用 numpy 数组、flattenreshape、转换为整数等?第一个干净的解决方案不起作用吗??
    猜你喜欢
    • 2018-03-18
    • 1970-01-01
    • 2023-03-19
    • 1970-01-01
    • 1970-01-01
    • 2021-08-15
    • 1970-01-01
    • 2022-01-22
    • 1970-01-01
    相关资源
    最近更新 更多