【问题标题】:Exception in thread "RemoteBlock-temp-file-clean-thread" java.lang.OutOfMemoryError: Java heap space线程“RemoteBlock-temp-file-clean-thread”中的异常 java.lang.OutOfMemoryError:Java 堆空间
【发布时间】:2022-10-01 16:08:32
【问题描述】:

我有这个功能;当我取消注释时筛选线;功能未完成。

def create_vouchers(df):
   df = df.withColumnRenamed(\'time_stamp\', \'voucher_date\')
   df = df.withColumn(\'ref_number\', F.lpad(df[\'ref_number\'], 10, \'0\'))
   df = df.withColumn(\'op_brn_id\', F.lpad(df[\'op_brn_id\'], 6, \'0\'))
   df = df.withColumn(\'voucher_id\',
                           F.concat_ws(\"\", col(\'trx_date\'), col(\'ref_number\'), col(\'op_brn_id\')))
df = df.groupby(\'voucher_id\').agg(F.max(df.voucher_date).alias(\'voucher_date\'),                                                                       F.max(df.approve_date).alias(\'approve_date\'),
                                                                     F.max(df.approver_id).alias(\'approver_id\'),

                                                                     F.max(df.register_date).alias(\'register_date\'),
                                                                     F.max(df.registerer_id).alias(\'registerer_id\'),
                                                                     F.sum(\'c_amount\').alias(\'sum_c\'),
                                                                     F.sum(\'d_amount\').alias(\'sum_d\'))
df = df.withColumn(\'flag\',udf_compare_two_columns(col(\'sum_c\'),col(\'sum_d\')))
df_true = df.filter(df.flag==True)
df_false = df.filter(df.flag==False)
# df = df.select(\'*\').where(col(\'sum_c\')==col(\'sum_d\'))
df_doc = df_true.withColumn(\'guid\', udf_create_objectid())
return df_doc

def compare_two_columns(a, b):
    if a == b:
    result = True
else:
    result = False
return result

运行上述代码后,我收到此错误:

  Exception in thread \"RemoteBlock-temp-file-clean-thread\" java.lang.OutOfMemoryError: Java heap space 22/08/27 09:46:49 ERROR Utils: uncaught error in thread spark-listener-group-appStatus, stopping SparkContext java.lang.BootstrapMethodError: call site initialization exception

你能指导我有什么问题吗?

非常感谢任何帮助。

    标签: java pyspark apache-spark-sql


    【解决方案1】:

    问题解决了。我认为最后一个函数占用了很多内存。所以,当我把函数改成这个时,就再也没有错误了。

     def create_vouchers(df):
        df = df.withColumnRenamed('timestamp', 'v_date')
        df = df.withColumn('ref_number', F.lpad(df['ref_number'], 10, '0'))
        df = df.withColumn('op_b_id', F.lpad(df['OP_B_ID'], 6, '0'))   
        df = df.groupby('trx_date', 'ref_number', 'op_b_id', 'v_type').agg(
        F.max(df.v_date).alias('v_date'),
        F.max(df.V_GUID).alias('guid'),
        F.max(df.APP_DATE).alias('app_date'),
        F.max(df.APP_ID).alias('app_id'),
        F.max(df.REG_DATE).alias('reg_date'),
        F.max(df.REG_ID).alias('reg_id'),
        F.sum(df.CRD_AMOUNT).alias('sum_crd'),
        F.sum(df.DBT_AMOUNT).alias('sum_dbt'))
    
        df = df.withColumn('result', col('sum_crd') - col('sum_dbt'))
        df = df.withColumn('v_id',
                       F.concat_ws("", col('trx_date'), col('ref_number'), col('op_b_id'),
                                   col('v_type')))
      df_doc = df.filter((col('result') == 0) | (col('v_type') == '1'))
      df_false = df.filter((col('result') != 0) & (col('v_type') == '2'))
    
      df_error = check_data.filling_df_error(df_false, 'v_id', 'trx', 'InComplete_V')
    
      return df_error, df_doc
    

    我希望它对其他人有用。

    【讨论】:

      猜你喜欢
      • 2016-12-16
      • 2016-07-29
      • 2012-07-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-20
      • 1970-01-01
      • 2010-12-08
      相关资源
      最近更新 更多