【问题标题】:How to append empty row(for loop output) to a data frame in pyspark如何将空行(用于循环输出)附加到pyspark中的数据框
【发布时间】:2021-06-14 08:53:58
【问题描述】:

可以观察到,我们有两个数据框 currdfhist_df,逻辑在代码中定义,预期输出是

所以在预期的输出中,我们可以观察到每次迭代我们得到yellow marked data frame,总是有单行或没有行,我们需要继续附加到最终数据帧,如果数据帧(tmp)是为空,则只有 pcode 将作为实际存储,其余列将为空。

重现代码

import pyspark
import os
import warnings
warnings.filterwarnings('ignore')
import pyspark.sql.functions as sf
from pyspark.sql import*
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Pycode2Pyspark_RProducer').getOrCreate()
spark
     currdf = spark.createDataFrame([(66, "d","4"), 
                                    (67, "a", "0"), 
                                    (70, "b", "4"), 
                                    (71, "d", "4"),
                                    (45, "x", "3"),
                                    (48, "y", "3")],
                                    ("id", "pcode", "amt"))
    currdf.show()
    
    hist_df = spark.createDataFrame([(66, "d",4,1), 
                                    (67, "a", 0,0), 
                                    (70, "b", 4,1), 
                                    (71, "a", 9,0),
                                    (45, "c", 2,1),
                                    (12, "d", 7,0),
                                    (37, "b", 3,0),
                                    (89, "c", 1,0),
                                    (11, "e", 9,1),
                                    (79, "f", 6,1)],
                                    ("id", "pcode", "amt","status1"))
    hist_df.show()
    
            import functools
    from pyspark.sql import DataFrame
    dataCollect = currdf.collect()
    output_dfs = []
    for row in dataCollect:
        temp_var =  row['pcode']
        print(temp_var)    
        temp_filter =  hist_df.where((hist_df['pcode'] == temp_var)) # Filter 
        temp_filter.show()
        tmp = temp_filter.groupby('pcode').agg(sf.sum('amt').alias('amt'),sf.sum('status1').alias('status1'))
        tmp = tmp.withColumn('status', sf.when((sf.col('amt')> 3) & (sf.col('status1')> 0),'Yes').otherwise('No'))
        tmp.show()
        output_dfs.append(tmp)
    df_output = functools.reduce(DataFrame.union, output_dfs)

  

【问题讨论】:

    标签: python dataframe for-loop pyspark apache-spark-sql


    【解决方案1】:
    from pyspark.sql import DataFrame
    dataCollect = claimdf.collect()
    output_dfs = []
    for row in dataCollect:
        temp_var =  row['pcode']
        #print(temp_var)   
        temp_filter =  hist_df.where((hist_df['pcode'] == temp_var)) # Filter
        #temp_filter.show()
        tmp = temp_filter.groupby('pcode').agg(sf.sum('amt').alias('amt'),sf.sum('status1').alias('status1'))
        tmp = tmp.withColumn('status', sf.when((sf.col('amt')> 3) & (sf.col('status1')> 0),'Yes').otherwise('No'))
        #tmp.show()
        if tmp.rdd.isEmpty():
            def convert(list):
                return tuple(list)
            columns = tmp.columns
            mylist = ['' for i in range(0,len(columns))]        
            element_index = [i[0] for i in  enumerate(columns) if i[1] == 'pcode'][0]
            mylist[element_index] =  temp_var
            mylist = convert(mylist)
            newRow = sqlContext.createDataFrame([mylist], columns)
            #newRow.show()
            output_dfs.append(newRow)
        else:
            output_dfs.append(tmp)
    df_output = functools.reduce(DataFrame.union, output_dfs)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-11-13
      • 2019-07-12
      • 2019-10-15
      • 1970-01-01
      • 2021-03-15
      • 2020-07-26
      相关资源
      最近更新 更多