【问题标题】:Scala Spark job to upload CSV file data to mongo DB fails due to CodecConfigurationException由于 CodecConfigurationException,Scala Spark 作业将 CSV 文件数据上传到 mongo DB 失败
【发布时间】:2019-06-13 12:54:56
【问题描述】:

我是 spark 和 scala 的新手。我正在尝试使用 Scala 中的 spark 作业将 csv 文件上传到 Mongo DB。

上传时,在作业执行过程中遇到以下错误,

org.bson.codecs.configuration.CodecConfigurationException:找不到类的编解码器。

输入文件的路径将在执行过程中传递。

在过去的 2 天里,我一直在纠结这个问题。感谢您为解决此问题提供任何帮助。

谢谢。

我已经尝试将它上传到弹性搜索,它就像一个魅力。

import org.apache.spark.sql.Row
import com.mongodb.spark._
import com.mongodb.spark.config.WriteConfig
import org.apache.spark.sql.{SaveMode, SparkSession}
import com.test.Config


object MongoUpload {
    val host = <host>
    val user = <user>
    val pwd = <password>
    val database = <db>
    val collection = <collection>
    val uri = "mongodb://${user}:${pwd}@${host}/"
    val NOW = java.time.LocalDate.now.toString

  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("Mongo-Test-Upload")
      .config("spark.mongodb.output.uri", uri)
      .getOrCreate()

    spark
      .read
      .format("csv")
      .option("header", "true")
      .load(args(0))
      .rdd
      .map(toEligibility)
      .saveToMongoDB(
        WriteConfig(
            Map(
                "uri" -> uri,
                "database" -> database,
                "collection" -> collection
            )
        )
      )
   }


  def toEligibility(row: Row): Eligibility =
    Eligibility(
      row.getAs[String]("DATE_OF_BIRTH"),
      row.getAs[String]("GENDER"),
      row.getAs[String]("INDIVIDUAL_ID"),
      row.getAs[String]("PRODUCT_NAME"),
      row.getAs[String]("STATE_CODE"),
      row.getAs[String]("ZIPCODE"),
      NOW
    )
}

case class Eligibility (
  dateOfBirth: String,
  gender: String,
  recordId: String,
  ProductIdentifier: String,
  stateCode: String,
  zipCode: String,
  updateDate: String
)

Spark 作业失败并出现以下错误,原因是:org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class Eligibility

【问题讨论】:

标签: mongodb scala apache-spark codec


【解决方案1】:

您可以映射到所需格式的 Document 或转换为 Dataset 然后保存,例如:

    import spark.implicits._
    spark
      .read
      .format("csv")
      .option("header", "true")
      .load(args(0))
      .rdd
      .map(toEligibility)
      .toDS()
      .write()
      .format("com.mongodb.spark.sql.DefaultSource")
      .options(Map("uri" -> uri,"database" -> database, "collection" -> collection)
      .save()
   }

【讨论】:

  • 嗨,罗斯,我在 .toDS[Eligibility]() 行中遇到以下错误 - 方法 toDS: ()org.apache.spark.sql.Dataset[com.optum.ohhl.nhi .mongo.BEligibility] 不带类型参数
  • 然后我删除了类型参数并尝试了 .toDS().saveToMongoDB(..),再次出现以下错误,值 saveToMongoDB 不是 org.apache.spark.sql 的成员.数据集
  • 更新了示例 - 现在使用 write() api。您实际上应该能够在原始 csv 数据框中选择您想要的列并使用别名来重命名它们。但这可能是未来的练习。
  • 嗨@Ross,有没有办法可以修改上面的代码以避免重复的条目被插入到mongo DB中?如何根据唯一字段值(例如标识符)限制重复条目?
  • @dwarakeshtp 如果这对你有用,你能接受这个作为答案吗?以便对其他人有所帮助
猜你喜欢
  • 1970-01-01
  • 2017-11-24
  • 2019-01-18
  • 2014-06-29
  • 2016-10-10
  • 2023-03-28
  • 1970-01-01
  • 2018-06-26
  • 2017-09-13
相关资源
最近更新 更多