【发布时间】:2019-02-08 09:17:54
【问题描述】:
我正在为 Spark 基准测试编写一个 Spark 2.4 转换,它将从 Kafka 主题获取 JSON 流,并需要将其转储到 MongoDB。我可以使用 Java MongoClient 来做到这一点,但数据可能很大,例如来自 Kafka 的多个线程的 100 万条记录。 Spark 处理它非常快,但 mongo 写入非常慢。
SparkConf sparkConf = new SparkConf().setMaster("local[*]").
setAppName("JavaDirectKafkaStreaming");
sparkConf.set("spark.streaming.backpressure.enabled","true");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(2));
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "loacalhost:9092");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("group.id", "2");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("poc-topic");
final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(streamingContext,
LocationStrategies.PreferConsistent(),
org.apache.spark.streaming.kafka010.ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
@SuppressWarnings("serial")
JavaPairDStream<String, String> jPairDStream = stream
.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
return new Tuple2<>(record.key(), record.value());
}
});
jPairDStream.foreachRDD(jPairRDD -> {
jPairRDD.foreach(rdd -> {
System.out.println("value=" + rdd._2());
if (rdd._2() != null) {
System.out.println("inserting=" + rdd._2());
Document doc = Document.parse(rdd._2());
// List<Document> list = new ArrayList<>();
// list.add(doc);
db.getCollection("collection").insertOne(doc);
System.out.println("Inserted Data Done");
}
else {
System.out.println("Got no data in this window");
}
});
});
streamingContext.start();
streamingContext.awaitTermination();
在哪里
MongoClient mongo = new MongoClient("localhost", 27017);
MongoDatabase db = mongo.getDatabase("mongodb");
我希望加快 mongo 操作,如何实现 mongo 写入的多线程? (我应该为每个主机使用 MongoClientOptions 吗?)
使用 MongoDriver 的方法也是正确的,或者应该由 MonogSpark 连接器或 Spark writestream() API 完成。如果是,如何在 mongo 中将每个 rdd 作为单独的记录写入 Java 中的任何示例?
【问题讨论】:
-
有什么理由要编写 Spark 代码而不是使用 Kafka Connect 之类的东西?
-
当我的原始数据从 Kafka 流向 Spark 并且数据转换发生在 spark 将结果存储到 mongoDb 时,我需要检查性能。
-
通常人们喜欢使用的模式是 Topic->Transform->New_Topic(例如使用 Kafka Streams API 或 Spark),然后您可以从该新主题发送到许多其他数据源,其中之一可能是 Mongo 并使用 Connect... 换句话说,在这里分离您的担忧。一个应用程序来转换数据。另一个下沉到 Mongo 的应用程序。此外,还有用于 Mongo 的 SparkSQL 编写器,因此如果您使用的是 Spark 2.x,您真的不应该手动使用
foreachRDD -
好的,谢谢,我会检查一下。另外,我使用的是 spark 2.4,你能指出一些 SparkSQL for mongoDB 的例子吗?
-
直接在Mongo网站上...docs.mongodb.com/spark-connector/v1.1/spark-sql
标签: java mongodb apache-spark apache-kafka spark-streaming