【问题标题】:Writing records to hdfs when catch exception in filter在过滤器中捕获异常时将记录写入hdfs
【发布时间】:2020-07-17 22:43:53
【问题描述】:

我有一个必须过滤的数据框。但是,在过滤器中,火花正在连接到数据库。如果数据库连接失败,我必须将该行写入 hdfs

    //filteredRawDf is dataframe
    val filteredRawDf = dfToReingest.filter { rawRow =>
      // getting.database object to connect 
      val databaseClient = getDataBaseClient(config)

      //getting primary key from row
      val requestNumber = rawRow.getAs[Row]("Column1").getAs[String]("Subcolumn")

      // if primary key is present then it will return record otherwise null
      val requestNumber_srs = databaseClient.getRecord(requestNumber)

      requestNumber_srs == null
    }

如果数据库关闭,那么它将通过异常。如果抛出异常,我们必须获取 Rows 并将其保存到 hdfs 。

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    我可以考虑两个选项(如果我能正确理解您的问题):

    1. 将 _currupted_record 列添加到模式中,然后使用 try catch 放弃您的过滤器代码,如果抛出异常,则使 row[_currupted_record ]=true 并返回 true(您需要不被过滤器删除的记录),然后过滤所有 _currupted_record =true 记录并将它们写入 HDFS。

    2. 使用 try catch 放弃您的过滤器代码,如果抛出异常,则通过 JAVA HDFS FileSystem 将 rawRow 写入 HDFS 并返回 false。 您可以在这里查看如何使用 FileSystem api (How to write to HDFS using Scala) 将文件写入 HDFS

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-11-08
      • 2015-03-01
      • 2011-09-08
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多