【问题标题】:Getting Empty set while reading data from kafka-Spark-Streaming从 kafka-Spark-Streaming 读取数据时设置为空
【发布时间】:2017-01-26 17:02:06
【问题描述】:

您好,我是 Spark Streaming 的新手。我正在尝试读取 xml 文件并将其发送到 kafka 主题。这是我的 Kafka 代码,它将数据发送到 Kafka-console-consumer。

代码:

package org.apache.kafka.Kafka_Producer;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutionException;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

@SuppressWarnings("unused")
public class KafkaProducer { 
   private static String sCurrentLine;
   public static void main(String args[]) throws InterruptedException, ExecutionException{ 
       try (BufferedReader br = new BufferedReader(new FileReader("/Users/sreeharsha/Downloads/123.txt")))
       {
           while ((sCurrentLine = br.readLine()) != null) {
               System.out.println(sCurrentLine);
               kafka(sCurrentLine);
           }
       } catch (FileNotFoundException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
       } catch (IOException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();}
   }
   public static void kafka(String sCurrentLine)  {
       Properties props = new Properties();
       props.put("metadata.broker.list", "localhost:9092");
       props.put("serializer.class", "kafka.serializer.StringEncoder");
       props.put("partitioner.class","kafka.producer.DefaultPartitioner");
       props.put("request.required.acks", "1");
       ProducerConfig config = new ProducerConfig(props);
       Producer<String, String> producer = new Producer<String, String>(config);
       producer.send(new KeyedMessage<String, String>("sample",sCurrentLine));
       producer.close();
   }
}

我可以在 Kafka-Console-Consumer 中接收数据。在下面的屏幕截图中,您可以看到我发送到该主题的数据。

现在我需要使用 Spark-Streaming 流式传输我发送到 kafka-console-consumer 的数据。这是代码。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

public class SparkStringConsumer {

   public static void main(String[] args) {

       SparkConf conf = new SparkConf()
               .setAppName("kafka-sandbox")
               .setMaster("local[*]");
       JavaSparkContext sc = new JavaSparkContext(conf);
       JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

       Map<String, String> kafkaParams = new HashMap<>();
       kafkaParams.put("metadata.broker.list", "localhost:9092");
       Set<String> topics = Collections.singleton("sample");

       JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
       String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
       directKafkaStream.foreachRDD(rdd -> {
       System.out.println("--- New RDD with " + rdd.partitions().size()
           + " partitions and " + rdd.count() + " records");
       rdd.foreach(record -> System.out.println(record._2));
       });
       ssc.start();
       ssc.awaitTermination();
   }
}

像这样提交我的工作时变得空虚:

./spark-submit --class org.apache.spark_streaming.Spark_Kafka_Streaming.SparkStringConsumer --master local[4] Spark_Kafka_Streaming-0.0.1-SNAPSHOT.jar

您可以在下面看到数据如何接收的屏幕截图:

使用以下版本:

火花 - 2.0.0

动物园管理员-3.4.6

卡夫卡 - 0.8.2.1

请给点建议,

【问题讨论】:

  • SparkReceiver 类的代码在哪里?您已经发布了 SparkStringConsumer 类,其中您将主题用作“mytopic”,并且在 KafkaProducer 类中您正在发送关于主题“sample”的消息。你能查一下吗?
  • 现在更新了可以再过一遍吗?
  • 尝试在 kafka 中生成新的消息
  • 在 spark 中读取空集时向 kafka 发送消息不是问题
  • 尝试在你的生产者类中使用它而不是从文件中读取。这是为了测试。随机随机=新随机(); while(true){ kafka("Test- "+random.nextInt(100));线程.sleep(500); } } 还请检查它从哪里解析 SparkStringConsumer 类中的 StringDecoder.class。应该是 import kafka.serializer.StringDecoder;

标签: apache-spark apache-kafka spark-streaming spark-dataframe


【解决方案1】:

终于在网上冲浪后,我找到了这些解决方案。

不要同时使用“Spark-Submit”和“SetMaster”。

  • 如果您从 IDE 运行代码,请在代码中使用 SetMaster
  • 如果您通过“Spark-Submit”运行 jar,请不要将 setMaster 放入您的代码中

还有一件事首先运行/提交您的 spark jar,然后将数据发送到 Kafka-Console-Consumer

工作正常。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2014-10-19
    • 1970-01-01
    • 2019-06-24
    • 2018-04-04
    • 1970-01-01
    • 2020-06-16
    • 1970-01-01
    • 2019-10-30
    相关资源
    最近更新 更多