【问题标题】:Spark DirectStream IssueSpark DirectStream 问题
【发布时间】:2017-09-29 01:57:42
【问题描述】:

我正在尝试从 Kafka 创建 Spark Direct Stream,但在创建 directStream 对象时出现错误:

kafkaUtils 类型中的 createDirectStream 方法不适用于(我传递的 HashMap 参数之一)。

在这一行: JavaPairInputDStream directKafkaStream = KafkaUtils.createDirectStream(ssc,String.class, String.class、StringDecoder.class、StringDecoder.class、kafkaParams、主题);

完整代码:

package kafkatest2;



import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.apache.commons.codec.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.kafka010.*;
public class SparkStream {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf()
                .setAppName("kafka-sandbox")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

        // TODO: processing pipeline
       Map<String,String> kafkaParams = new HashMap<String,String>();
       kafkaParams.put("metadata.broker.list", "localhost:9092");

        Set<String> topics = Collections.singleton("topic5");

JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,String.class,
        String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);


        directKafkaStream.foreachRDD(rdd -> {
            System.out.println("--- New RDD with " + rdd.partitions().size()
                    + " partitions and " + rdd.count() + " records");
            rdd.foreach(record -> System.out.println(record._2));
        });
        ssc.start();
        ssc.awaitTermination();
    }
}

【问题讨论】:

  • 你使用的是什么版本的 Spark 和 Kafka?
  • Spark 版本为 2.2.0 和 Kafka 0.11.0.1
  • 哈哈哈..这也发生在我身上.. :) 它编译器抱怨说 methodX ( A, B ) 与 methodsX ( A, B ) 不兼容。大多数时候,类名相同,但包不同。 :P

标签: java apache-spark apache-kafka spark-streaming


【解决方案1】:

在您的代码中,使用了错误的StringDecoder。应该是kafka.serializer.StringDecoder 而不是org.apache.commons.codec.StringDecoder

正确的代码如下:

package kafkatest2;



import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.kafka010.*;
public class SparkStream {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf()
                .setAppName("kafka-sandbox")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

        // TODO: processing pipeline
       Map<String,String> kafkaParams = new HashMap<String,String>();
       kafkaParams.put("metadata.broker.list", "localhost:9092");

        Set<String> topics = Collections.singleton("topic5");

JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,String.class,
        String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);


        directKafkaStream.foreachRDD(rdd -> {
            System.out.println("--- New RDD with " + rdd.partitions().size()
                    + " partitions and " + rdd.count() + " records");
            rdd.foreach(record -> System.out.println(record._2));
        });
        ssc.start();
        ssc.awaitTermination();
    }
}

希望对你有帮助!

【讨论】:

    猜你喜欢
    • 2015-11-05
    • 2015-11-07
    • 2015-02-07
    • 2016-07-19
    • 2018-10-23
    • 1970-01-01
    • 2020-05-27
    • 2015-06-17
    • 2019-01-07
    相关资源
    最近更新 更多