【问题标题】:log error from catch block to cosmos db - spark从 catch 块记录错误到 cosmos db - spark
【发布时间】:2021-10-13 17:16:23
【问题描述】:

目标:- 使用“get”api 调用从 S3 存储桶中检索对象,将检索到的对象写入 azure datalake,并在出现 404s(找不到对象)等错误时将错误消息写入 cosmos DB

“my_dataframe”由一列 (s3ObjectName) 组成,其对象名称如下:-

s3ObjectName
a1.json
b2.json
c3.json
d4.json
e5.json
//retry function that writes cosmos error in event of failure
def retry[T](n: Int)(fn: => T): T = {
  Try {
    return fn
  } match {
    case Success(x) => x
    case Failure(t: Throwable) => {
      Thread.sleep(1000)
      if (n > 1) {
        retry(n - 1)(fn)    
      } else {
        val loggerDf = Seq((t.toString)).toDF("Description")
           .withColumn("Type", lit("Failure"))
           .withColumn("id", uuid())
         loggerDf.write.format("cosmos.oltp").options(ExceptionCfg).mode("APPEND").save()
        throw t
      }
    }
  }
}
 
//execute s3 get api call
my_dataframe.rdd.foreachPartition(partition => {
        val creds = new BasicAWSCredentials(AccessKey, SecretKey)
        val clientRegion: Regions = Regions.US_EAST_1
        val s3client  = AmazonS3ClientBuilder.standard()
        .withRegion(clientRegion)
        .withCredentials(new AWSStaticCredentialsProvider(creds))
        .build()
          partition.foreach(x => {
            retry (2) {
            val objectKey = x.getString(0)
            val i = s3client.getObject(s3bucket_name, objectKey).getObjectContent
            val inputS3String = IOUtils.toString(i, "UTF-8")
            val filePath = s"${data_lake_file_path}"
            val file = new File(filePath)
            val fileWriter = new FileWriter(file)
            val bw = new BufferedWriter(fileWriter)
            bw.write(inputS3String)
            bw.close()
            fileWriter.close()
            }
          })
      })

执行上述操作时会导致以下错误:-

原因:java.lang.NullPointerException

retry函数在要求创建dataframe loggerDf并写入cosmos db时出现此错误

还有其他方法可以将错误消息写入 cosmos DB 吗?

【问题讨论】:

    标签: scala apache-spark azure-cosmosdb databricks


    【解决方案1】:

    也许现在不是使用 spark 的好时机。已经有一些 hadoop 工具来完成这种类型的S3 file transfer using hadoop,它可以完成您正在做的事情,但使用了 hadoop 工具。

    如果您仍然觉得 spark 是正确的工具: 将其拆分为报告问题和数据传输问题。 创建并测试文件列表以查看它们是否有效。编写一个 UDF 来完成创建好/坏文件数据框的繁琐工作。 报告无效的文件。 (致宇宙)

    传输有效的文件。

    【讨论】:

    • 除非执行 s3 get api 调用,这是序列化对象中的一种方法,否则我不会知道记录是否错误。
    【解决方案2】:

    如果您想将错误写入 cosmo DB,您需要使用“带外”方法从执行程序启动连接。(想想:从 partition.foreach 内部启动 jdbc 连接。)

    作为一个较低的标准,如果您想知道它是否发生,您可以使用Accumulators。这不是为记录而设计的,但确实有助于将信息从执行程序传输到驱动程序。这将使您能够将某些内容写回 Cosmos,但实际上是用来简单地计算是否发生了某些事情。 (如果您最终重试执行程序,可能会重复计算,所以它并不完美。)从技术上讲,它可以将信息传输回驱动程序,但只能用于可数的事情。 (如果这种类型的故障非常不规则,它可能是合适的。如果这种情况经常发生,则不适合使用。)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-03-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多