【发布时间】:2021-06-14 08:53:58
【问题描述】:
可以观察到,我们有两个数据框 currdf 和 hist_df,逻辑在代码中定义,预期输出是
所以在预期的输出中,我们可以观察到每次迭代我们得到yellow marked data frame,总是有单行或没有行,我们需要继续附加到最终数据帧,如果数据帧(tmp)是为空,则只有 pcode 将作为实际存储,其余列将为空。
重现代码
import pyspark
import os
import warnings
warnings.filterwarnings('ignore')
import pyspark.sql.functions as sf
from pyspark.sql import*
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Pycode2Pyspark_RProducer').getOrCreate()
spark
currdf = spark.createDataFrame([(66, "d","4"),
(67, "a", "0"),
(70, "b", "4"),
(71, "d", "4"),
(45, "x", "3"),
(48, "y", "3")],
("id", "pcode", "amt"))
currdf.show()
hist_df = spark.createDataFrame([(66, "d",4,1),
(67, "a", 0,0),
(70, "b", 4,1),
(71, "a", 9,0),
(45, "c", 2,1),
(12, "d", 7,0),
(37, "b", 3,0),
(89, "c", 1,0),
(11, "e", 9,1),
(79, "f", 6,1)],
("id", "pcode", "amt","status1"))
hist_df.show()
import functools
from pyspark.sql import DataFrame
dataCollect = currdf.collect()
output_dfs = []
for row in dataCollect:
temp_var = row['pcode']
print(temp_var)
temp_filter = hist_df.where((hist_df['pcode'] == temp_var)) # Filter
temp_filter.show()
tmp = temp_filter.groupby('pcode').agg(sf.sum('amt').alias('amt'),sf.sum('status1').alias('status1'))
tmp = tmp.withColumn('status', sf.when((sf.col('amt')> 3) & (sf.col('status1')> 0),'Yes').otherwise('No'))
tmp.show()
output_dfs.append(tmp)
df_output = functools.reduce(DataFrame.union, output_dfs)
【问题讨论】:
标签: python dataframe for-loop pyspark apache-spark-sql