【问题标题】:Performance issue in PySpark/Aws GluePySpark/Aws Glue 中的性能问题
【发布时间】:2019-07-23 19:09:14
【问题描述】:

我有一个数据框。我需要将每条记录转换为 JSON,然后使用 JSON 有效负载调用 API 以将数据插入 postgress。我在数据框中有 14000 条记录并调用 api 并获得响应,这需要 5 小时。有什么方法可以提高性能。下面是我的代码 sn-p。

df_insert = spark.read \
.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", "source_table_name") \
.load()

json_insert = df_insert.toJSON().collect()

for row in json_insert:
  line = json.loads(row)
    headers = {
    'Authorization': authorization,
    'content-type': "application/json",
    'cache-control': "no-cache",
    }
  response = requests.request("POST", url_insert, data=payload, headers=headers)
  print(response.text)
  res = response.text
  response_result = json.loads(res)
  #print(response_result["httpStatus"])
  if response_result["message"] == 'success':
      print ("INFO : Record inserted successfully")
  else:
      print ("ERROR : Error in the record")
      status_code = response_result["status"]
      error_message =  response_result["error"]
      my_list = [(status_code,error_message,row)]
      df = sc.createDataFrame(my_list, ['status', 'error', 'json data'])
      df.write.format(SNOWFLAKE_SOURCE_NAME) \
      .options(**sfOptions) \
      .option("dbtable", "error_table") \
      .option("header", "true") \
      .option("truncate_table", "on") \
      .mode("append") \
      .save()

注意:我知道通过执行“json_insert = df_insert.toJSON().collect()”我正在失去数据帧的优势。有没有更好的方法来完成。

【问题讨论】:

    标签: python apache-spark pyspark aws-glue snowflake-cloud-data-platform


    【解决方案1】:

    df_insert.toJSON() 返回一个RDD,您可以在上面使用flatMap1

    source_rdd = df_insert.toJSON()
    

    对这个 RDD 执行 flatMap 并返回一个只包含错误的 RDD。

    headers = {
        'Authorization': authorization,
        'content-type': "application/json",
        'cache-control': "no-cache"
    }
    
    def post_service_error(row):
        # requests package may not be available in the node
        # see about adding files to the spark context
        response = requests.request("POST", url_insert, data=row, headers=headers)
        response_result = response.json()
        if response_result['message'] == 'success':
            print ("INFO : Record inserted successfully")
            return []
        print ("ERROR : Error in the record")
        status_code = response_result["status"]
        error_message =  response_result["error"]
        return [(status_code, error_message, row)]
    
    errors_rdd = source_rdd.flatMap(post_service_error)
    

    将错误 RDD 转换为 spark DataFrame 并将其保存到表中。

    errors_df = sc.createDataFrame(errors_rdd, ['status', 'error', 'json data'])
    (errors_df.write.format(SNOWFLAKE_SOURCE_NAME)
      .options(**sfOptions)
      .option("dbtable", "error_table")
      .option("header", "true")
      .option("truncate_table", "on")
      .mode("append")
      .save())
    

    如果您拥有要向其发出请求的 API,我建议您探索一个接受一批这些对象/数组的实现。 这样,您可以在将每个分区映射到批处理请求之前对 RDD 进行分区,然后再处理错误。

    【讨论】:

    • 我们不拥有 API,我们必须遍历每个 json 对象并调用 api。使用上述方法,当使用 flatMap 调用时,作业不会进入函数“post_service_error”。
    • 如果您不拥有该 API,您可以将其平面映射到错误元组列表,如我在示例中所示。 “工作不在函数内部”——我不确定我完全理解你的意思。您的主控制台或节点控制台中记录了哪些错误?
    • 我从以下代码开始: def post_service_error(row): print ("Inside method .................") print(row) print('Starting job ......') source_rdd.map(post_service_error) print('Jobended .........') 在输出开始作业和结束的作业正在打印但不是“内部方法”
    • 无论您是 flatMap 还是 map,您仍然可以在 Rdd 上调用 collect 来将任务分发到节点并收集任务结果。 mapflatMap 只是声明任务的方式。您在问题中编写的方式是在主节点上运行任务,而不是利用集群中可能拥有的其他节点来运行任务。
    • 是的,由于收集,我无法利用其他节点,因此工作花费了太多时间。因此,为了改善结果,我尝试了您提供的方式。在这里,当我尝试从 flatMap (source_rdd.flatMap(post_service_error)) 调用函数(post_service_error) 时,我什么也没得到。我尝试只在函数内部给出一个打印命令,但它没有打印。对不起,我的沟通不畅
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-12-07
    • 2021-09-13
    • 1970-01-01
    • 2022-01-23
    • 1970-01-01
    • 1970-01-01
    • 2018-01-30
    相关资源
    最近更新 更多