对于仍在为此苦苦挣扎的人。最后对我有用的是以下配置(设置或配置您的 mongo-spark-connector):
MongoDb 版本 3.4.14;
Spark 2.2.1 版;
斯卡拉版本 2.11.8;
罐子:
mongo-spark-connector_2.11-2.2.1.jar、mongodb-driver-core-3.4.2.jar、mongo-java-driver-3.4.2.jar、bson-3.4.2.jar、
使用正确的 Spark、Scala 版本和正确的 mongo-spark-connector jar 版本显然是关键,包括所有正确的版本mongodb-driver-core、bson 和 mongo-java-driver jar。
我必须检查 Maven Central 上 mongo-spark-connector 版本的 pom.xml 我需要查看哪个版本的 mongo-java-driver需要,然后下载相应的 mongodb-driver-core 和 bson jars。
最后测试:Scala
spark-shell --conf "spark.mongodb.input.uri=mongodb://<username>:<password>@<server>/<database>.<collection>?readPreference=primaryPreferred" --conf "spark.mongodb.output.uri=mongodb://mongodb://<username>:<password>@<server>/<database>.<collection>" --jars <path to jar>/mongo-spark-connector_2.11-2.2.1.jar,<path to jar>/mongodb-driver-core-3.4.2.jar,<path to jar>/bson-3.4.2.jar,<path to jar>/mongo-java-driver-3.4.2.jar
scala> import com.mongodb.spark._
import com.mongodb.spark._
scala> val rdd = MongoSpark.load(sc)
println(rdd.count)
测试 pyspark:使用聚合管道的示例
pyspark --jars <path to jar>/mongo-spark-connector_2.11-2.2.1.jar,<path to jar>/mongodb-driver-core-3.4.2.jar,<path to jar>/bson-3.4.2.jar,<path to jar>/mongo-java-driver-3.4.2.jar
In [1]: stage1="{'$match':{'_id':ObjectId('<SOME UID>')}}"
In [2]: df=spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://<username>:<password>@<server><database>.<collection>").option("pipeline",stage1).load()
In [3]: df.printSchema()