【发布时间】:2019-07-23 19:09:14
【问题描述】:
我有一个数据框。我需要将每条记录转换为 JSON,然后使用 JSON 有效负载调用 API 以将数据插入 postgress。我在数据框中有 14000 条记录并调用 api 并获得响应,这需要 5 小时。有什么方法可以提高性能。下面是我的代码 sn-p。
df_insert = spark.read \
.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", "source_table_name") \
.load()
json_insert = df_insert.toJSON().collect()
for row in json_insert:
line = json.loads(row)
headers = {
'Authorization': authorization,
'content-type': "application/json",
'cache-control': "no-cache",
}
response = requests.request("POST", url_insert, data=payload, headers=headers)
print(response.text)
res = response.text
response_result = json.loads(res)
#print(response_result["httpStatus"])
if response_result["message"] == 'success':
print ("INFO : Record inserted successfully")
else:
print ("ERROR : Error in the record")
status_code = response_result["status"]
error_message = response_result["error"]
my_list = [(status_code,error_message,row)]
df = sc.createDataFrame(my_list, ['status', 'error', 'json data'])
df.write.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", "error_table") \
.option("header", "true") \
.option("truncate_table", "on") \
.mode("append") \
.save()
注意:我知道通过执行“json_insert = df_insert.toJSON().collect()”我正在失去数据帧的优势。有没有更好的方法来完成。
【问题讨论】:
标签: python apache-spark pyspark aws-glue snowflake-cloud-data-platform