【发布时间】:2020-07-17 22:43:53
【问题描述】:
我有一个必须过滤的数据框。但是,在过滤器中,火花正在连接到数据库。如果数据库连接失败,我必须将该行写入 hdfs
//filteredRawDf is dataframe
val filteredRawDf = dfToReingest.filter { rawRow =>
// getting.database object to connect
val databaseClient = getDataBaseClient(config)
//getting primary key from row
val requestNumber = rawRow.getAs[Row]("Column1").getAs[String]("Subcolumn")
// if primary key is present then it will return record otherwise null
val requestNumber_srs = databaseClient.getRecord(requestNumber)
requestNumber_srs == null
}
如果数据库关闭,那么它将通过异常。如果抛出异常,我们必须获取 Rows 并将其保存到 hdfs 。
【问题讨论】:
标签: scala apache-spark