【发布时间】:2021-10-13 17:16:23
【问题描述】:
目标:- 使用“get”api 调用从 S3 存储桶中检索对象,将检索到的对象写入 azure datalake,并在出现 404s(找不到对象)等错误时将错误消息写入 cosmos DB
“my_dataframe”由一列 (s3ObjectName) 组成,其对象名称如下:-
| s3ObjectName |
|---|
| a1.json |
| b2.json |
| c3.json |
| d4.json |
| e5.json |
//retry function that writes cosmos error in event of failure
def retry[T](n: Int)(fn: => T): T = {
Try {
return fn
} match {
case Success(x) => x
case Failure(t: Throwable) => {
Thread.sleep(1000)
if (n > 1) {
retry(n - 1)(fn)
} else {
val loggerDf = Seq((t.toString)).toDF("Description")
.withColumn("Type", lit("Failure"))
.withColumn("id", uuid())
loggerDf.write.format("cosmos.oltp").options(ExceptionCfg).mode("APPEND").save()
throw t
}
}
}
}
//execute s3 get api call
my_dataframe.rdd.foreachPartition(partition => {
val creds = new BasicAWSCredentials(AccessKey, SecretKey)
val clientRegion: Regions = Regions.US_EAST_1
val s3client = AmazonS3ClientBuilder.standard()
.withRegion(clientRegion)
.withCredentials(new AWSStaticCredentialsProvider(creds))
.build()
partition.foreach(x => {
retry (2) {
val objectKey = x.getString(0)
val i = s3client.getObject(s3bucket_name, objectKey).getObjectContent
val inputS3String = IOUtils.toString(i, "UTF-8")
val filePath = s"${data_lake_file_path}"
val file = new File(filePath)
val fileWriter = new FileWriter(file)
val bw = new BufferedWriter(fileWriter)
bw.write(inputS3String)
bw.close()
fileWriter.close()
}
})
})
执行上述操作时会导致以下错误:-
原因:java.lang.NullPointerException
retry函数在要求创建dataframe loggerDf并写入cosmos db时出现此错误
还有其他方法可以将错误消息写入 cosmos DB 吗?
【问题讨论】:
标签: scala apache-spark azure-cosmosdb databricks