【问题标题】:How to do Kafka-Spark-MongoDb integration efficiently如何高效地进行 Kafka-Spark-MongoDb 集成
【发布时间】:2019-02-08 09:17:54
【问题描述】:

我正在为 Spark 基准测试编写一个 Spark 2.4 转换,它将从 Kafka 主题获取 JSON 流,并需要将其转储到 MongoDB。我可以使用 Java MongoClient 来做到这一点,但数据可能很大,例如来自 Kafka 的多个线程的 100 万条记录。 Spark 处理它非常快,但 mongo 写入非常慢。

            SparkConf sparkConf = new SparkConf().setMaster("local[*]").
            setAppName("JavaDirectKafkaStreaming"); 

    sparkConf.set("spark.streaming.backpressure.enabled","true");
    JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(2));

    Map<String, Object> kafkaParams = new HashMap<String, Object>();
    kafkaParams.put("bootstrap.servers", "loacalhost:9092");
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("group.id", "2");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", false);

    Collection<String> topics = Arrays.asList("poc-topic");

    final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(streamingContext,
            LocationStrategies.PreferConsistent(),
            org.apache.spark.streaming.kafka010.ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));

    @SuppressWarnings("serial")
    JavaPairDStream<String, String> jPairDStream = stream
            .mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
                public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
                    return new Tuple2<>(record.key(), record.value());
                }
            });

    jPairDStream.foreachRDD(jPairRDD -> {

        jPairRDD.foreach(rdd -> {
            System.out.println("value=" + rdd._2());
            if (rdd._2() != null) {
                System.out.println("inserting=" + rdd._2());

                Document doc = Document.parse(rdd._2());
                // List<Document> list = new ArrayList<>();
                // list.add(doc);
                db.getCollection("collection").insertOne(doc);
                System.out.println("Inserted Data Done");
            }

            else {
                System.out.println("Got no data in this window");
            }

        });
    });
    streamingContext.start();
    streamingContext.awaitTermination();

在哪里

             MongoClient mongo = new MongoClient("localhost", 27017);
             MongoDatabase db = mongo.getDatabase("mongodb");

我希望加快 mongo 操作,如何实现 mongo 写入的多线程? (我应该为每个主机使用 MongoClientOptions 吗?)

使用 MongoDriver 的方法也是正确的,或者应该由 MonogSpark 连接器或 Spark writestream() API 完成。如果是,如何在 mongo 中将每个 rdd 作为单独的记录写入 Java 中的任何示例?

【问题讨论】:

  • 有什么理由要编写 Spark 代码而不是使用 Kafka Connect 之类的东西?
  • 当我的原始数据从 Kafka 流向 Spark 并且数据转换发生在 spark 将结果存储到 mongoDb 时,我需要检查性能。
  • 通常人们喜欢使用的模式是 Topic->Transform->New_Topic(例如使用 Kafka Streams API 或 Spark),然后您可以从该新主题发送到许多其他数据源,其中之一可能是 Mongo 并使用 Connect... 换句话说,在这里分离您的担忧。一个应用程序来转换数据。另一个下沉到 Mongo 的应用程序。此外,还有用于 Mongo 的 SparkSQL 编写器,因此如果您使用的是 Spark 2.x,您真的不应该手动使用 foreachRDD
  • 好的,谢谢,我会检查一下。另外,我使用的是 spark 2.4,你能指出一些 SparkSQL for mongoDB 的例子吗?

标签: java mongodb apache-spark apache-kafka spark-streaming


【解决方案1】:

我不知道“高效”,因为这里有很多因素在起作用。

例如,Kafka 分区和 Spark 执行器总数只是需要调整以适应吞吐量的两个值。

我确实看到您正在使用ForEachWriter,这是一个很好的方法,但考虑到您经常调用insertOne,与开始使用 Spark Structed Streaming 相比,这可能不是最好的方法,阅读来自 Kafka,将您的数据处理成一个 Struct 对象,然后 using SparkSQL Mongo Connector 直接转储到 Mongo 集合(我猜它使用 Mongo 事务,并且一次插入多个记录)


另外值得一提的是,Landoop 提供了一个 MongoDB Kafka Connect Sink,它需要一个配置文件,并且不需要编写 Spark 代码。

【讨论】:

    猜你喜欢
    • 2019-09-05
    • 2019-01-15
    • 2021-02-28
    • 2019-07-28
    • 2017-01-01
    • 2019-06-08
    • 2017-06-30
    • 2017-11-29
    • 2021-02-23
    相关资源
    最近更新 更多