【发布时间】:2019-06-13 12:54:56
【问题描述】:
我是 spark 和 scala 的新手。我正在尝试使用 Scala 中的 spark 作业将 csv 文件上传到 Mongo DB。
上传时,在作业执行过程中遇到以下错误,
org.bson.codecs.configuration.CodecConfigurationException:找不到类的编解码器。
输入文件的路径将在执行过程中传递。
在过去的 2 天里,我一直在纠结这个问题。感谢您为解决此问题提供任何帮助。
谢谢。
我已经尝试将它上传到弹性搜索,它就像一个魅力。
import org.apache.spark.sql.Row
import com.mongodb.spark._
import com.mongodb.spark.config.WriteConfig
import org.apache.spark.sql.{SaveMode, SparkSession}
import com.test.Config
object MongoUpload {
val host = <host>
val user = <user>
val pwd = <password>
val database = <db>
val collection = <collection>
val uri = "mongodb://${user}:${pwd}@${host}/"
val NOW = java.time.LocalDate.now.toString
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("Mongo-Test-Upload")
.config("spark.mongodb.output.uri", uri)
.getOrCreate()
spark
.read
.format("csv")
.option("header", "true")
.load(args(0))
.rdd
.map(toEligibility)
.saveToMongoDB(
WriteConfig(
Map(
"uri" -> uri,
"database" -> database,
"collection" -> collection
)
)
)
}
def toEligibility(row: Row): Eligibility =
Eligibility(
row.getAs[String]("DATE_OF_BIRTH"),
row.getAs[String]("GENDER"),
row.getAs[String]("INDIVIDUAL_ID"),
row.getAs[String]("PRODUCT_NAME"),
row.getAs[String]("STATE_CODE"),
row.getAs[String]("ZIPCODE"),
NOW
)
}
case class Eligibility (
dateOfBirth: String,
gender: String,
recordId: String,
ProductIdentifier: String,
stateCode: String,
zipCode: String,
updateDate: String
)
Spark 作业失败并出现以下错误,原因是:org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class Eligibility
【问题讨论】:
-
不包含任何有关 Spark 作业配置的信息。不过感谢您的回复:)
标签: mongodb scala apache-spark codec