【发布时间】:2017-10-18 01:33:45
【问题描述】:
在我的应用程序中,有Web UI应用程序在完成文件上传过程后将文件路径发送到Kafka。
我有一个 Spark Streaming 应用程序,它使用 JavaSparkContext 和 JavaPairInputDStream 从 Kafka 提取消息(因此它接收文件路径,但也可能有多个文件路径)。
我必须并行处理文件并且需要将结果发送到另一个 kafka 流:
SparkConf conf = new SparkConf().setAppName("Task1").setMaster("local[*]");
sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
Set<String> topics = Collections.singleton("topic1");
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class,
String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd -> {
rdd.collect().forEach((t) -> {
sendMessage(sc, t._2());
});
});
ssc.start();
ssc.awaitTermination();
sendMessage 将发送文件内的数据。
在上述实现中,我在 foreachRDD 方法中使用 JavaSparkContext,这不是最佳实践。我想并行处理文件。
【问题讨论】:
-
@JacekLaskowski 我认为这只是此代码和问题中的众多问题之一。目前的问题非常广泛,不清楚需要什么或作者不清楚的地方
标签: java apache-spark apache-kafka spark-streaming