【发布时间】:2022-10-01 16:08:32
【问题描述】:
我有这个功能;当我取消注释时筛选线;功能未完成。
def create_vouchers(df):
df = df.withColumnRenamed(\'time_stamp\', \'voucher_date\')
df = df.withColumn(\'ref_number\', F.lpad(df[\'ref_number\'], 10, \'0\'))
df = df.withColumn(\'op_brn_id\', F.lpad(df[\'op_brn_id\'], 6, \'0\'))
df = df.withColumn(\'voucher_id\',
F.concat_ws(\"\", col(\'trx_date\'), col(\'ref_number\'), col(\'op_brn_id\')))
df = df.groupby(\'voucher_id\').agg(F.max(df.voucher_date).alias(\'voucher_date\'), F.max(df.approve_date).alias(\'approve_date\'),
F.max(df.approver_id).alias(\'approver_id\'),
F.max(df.register_date).alias(\'register_date\'),
F.max(df.registerer_id).alias(\'registerer_id\'),
F.sum(\'c_amount\').alias(\'sum_c\'),
F.sum(\'d_amount\').alias(\'sum_d\'))
df = df.withColumn(\'flag\',udf_compare_two_columns(col(\'sum_c\'),col(\'sum_d\')))
df_true = df.filter(df.flag==True)
df_false = df.filter(df.flag==False)
# df = df.select(\'*\').where(col(\'sum_c\')==col(\'sum_d\'))
df_doc = df_true.withColumn(\'guid\', udf_create_objectid())
return df_doc
def compare_two_columns(a, b):
if a == b:
result = True
else:
result = False
return result
运行上述代码后,我收到此错误:
Exception in thread \"RemoteBlock-temp-file-clean-thread\" java.lang.OutOfMemoryError: Java heap space 22/08/27 09:46:49 ERROR Utils: uncaught error in thread spark-listener-group-appStatus, stopping SparkContext java.lang.BootstrapMethodError: call site initialization exception
你能指导我有什么问题吗?
非常感谢任何帮助。
标签: java pyspark apache-spark-sql