【问题标题】:How to process files with file names in records from Kafka using JavaSparkContext?如何使用 JavaSparkContext 处理来自 Kafka 的记录中具有文件名的文件?
【发布时间】:2017-10-18 01:33:45
【问题描述】:

在我的应用程序中,有Web UI应用程序在完成文件上传过程后将文件路径发送到Kafka。

我有一个 Spark Streaming 应用程序,它使用 JavaSparkContextJavaPairInputDStream 从 Kafka 提取消息(因此它接收文件路径,但也可能有多个文件路径)。

我必须并行处理文件并且需要将结果发送到另一个 kafka 流:

SparkConf conf = new SparkConf().setAppName("Task1").setMaster("local[*]");
    sc = new JavaSparkContext(conf);
    JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

    Map<String, String> kafkaParams = new HashMap<>();
    kafkaParams.put("metadata.broker.list", "localhost:9092");
    Set<String> topics = Collections.singleton("topic1");

    JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class,
            String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);

    directKafkaStream.foreachRDD(rdd -> {

        rdd.collect().forEach((t) -> {
            sendMessage(sc, t._2());
        });
    });

    ssc.start();
    ssc.awaitTermination();

sendMessage 将发送文件内的数据。

在上述实现中,我在 foreachRDD 方法中使用 JavaSparkContext,这不是最佳实践。我想并行处理文件。

【问题讨论】:

  • @JacekLaskowski 我认为这只是此代码和问题中的众多问题之一。目前的问题非常广泛,不清楚需要什么或作者不清楚的地方

标签: java apache-spark apache-kafka spark-streaming


【解决方案1】:

我将创建一个函数sendMessage,它将是一个纯 Kafka 生产者(不依赖于 Spark,尤其是 JavaSparkContext),它将向 Kafka 主题发送消息或将所有消息的迭代器带到发送出去。

请参阅official documentation of Apache Kafka

纯 Kafka 生产者为 sendMessage,我将在 Spark Streaming 的转换中执行以下操作(内联的 cmets 应该为您提供一些关于每一行发生了什么的提示):

def sendMessage(message: String) = {
  println(s"Sending $message to Kafka")
}
dstream.map(_.value).foreachRDD { rdd =>
  println(s"Received rdd: $rdd with ${rdd.count()} records")
  // take paths from RDD that contains Kafka records with the file names
  val files = rdd.collect()
  files.foreach { f =>
    // read a file `f` using Spark Core's RDD API
    rdd.sparkContext.textFile(f).map { line =>
      // do something with line
      // this is the place for a pure Spark transformation
      // it's as if you were outside Spark Streaming
      println(line)
      line
    }.foreachPartition { linesAfterProcessingPerPartition =>
      // send lines to Kafka
      // they have been processed using Spark
      linesAfterProcessingPerPartition.foreach { line =>
        sendMessage(message = line)
      }
    }
  }
}

我相信代码会变得更加清晰,但那是 Scala,而你使用 Java,所以我会在这里停下来。


我强烈建议使用Spark SQL's Structured Streaming,因为它很快将取代 Spark Streaming 并成为 Spark 中的流 API。

【讨论】:

    【解决方案2】:

    例如:

    directKafkaStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
        public void call(JavaRDD<String> stringJavaRDD) throws Exception {
            stringJavaRDD.foreachPartition(new VoidFunction<Iterator<String>>() {
                public void call(Iterator<String> stringIterator) throws Exception {
                    sendMessage(stringIterator);
                }
            });
        }
    

    【讨论】:

    • sendMessage 的 SparkContext 怎么样?
    猜你喜欢
    • 1970-01-01
    • 2022-07-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-09-12
    • 2011-06-15
    • 2019-01-08
    • 1970-01-01
    相关资源
    最近更新 更多