【问题标题】:impossible to ingest data in solr with kafka无法使用 kafka 在 solr 中摄取数据
【发布时间】:2019-03-12 06:15:31
【问题描述】:

我正在尝试将带有kafka的数据自动插入到solr和banana,但是因为这个原因对我来说是不可能的

error in #Convert SolrDocuments

java.lang.NumberFormatException:对于输入字符串:“2007” 在 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) 在 java.lang.Integer.parseInt(Integer.java:580) 在 java.lang.Integer.valueOf(Integer.java:766) 在 com.example.streaming.EventParseUtil.convertData(EventParseUtil.java: 24) 在 com.example.streaming.CarEventsProcessor.lambda$main$91ca40fe$1(CarEventsProcessor.java:76) 在 org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.appl y(JavaPairRDD.scala:1015) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala :30) 在 com.lucidworks.spark.SolrSupport$5.call(SolrSupport.java:216) 在 com.lucidworks.spark.SolrSupport$5.call(SolrSupport.java:210) 在 org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.app ly(JavaRDDLike.scala:225) 在 org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.app ly(JavaRDDLike.scala:225) 在 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$3 5.apply(RDD.scala:927) 在 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$3 5.apply(RDD.scala:927) 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.sc ala:1857) 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.sc ala:1857) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 在 org.apache.spark.scheduler.Task.run(Task.scala:89) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:247) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:624) 在 java.lang.Thread.run(Thread.java:748) 06 年 18 月 10 日 01:10:08 错误 executor.Executor:阶段 0.0 中的任务 1.0 异常(T ID 1) java.lang.NumberFormatException:对于输入字符串:“2007” 在 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) 在 java.lang.Integer.parseInt(Integer.java:580) 在 java.lang.Integer.valueOf(Integer.java:766) 在 com.example.streaming.EventParseUtil.convertData(EventParseUtil.java: 24) 在 com.example.streaming.CarEventsProcessor.lambda$main$91ca40fe$1(CarEventsProcessor.java:76) 在 org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.appl y(JavaPairRDD.scala:1015) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala :30) 在 com.lucidworks.spark.SolrSupport$5.call(SolrSupport.java:216) 在 com.lucidworks.spark.SolrSupport$5.call(SolrSupport.java:210) 在 org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.app ly(JavaRDDLike.scala:225) 在 org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.app ly(JavaRDDLike.scala:225) 在 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$3 5.apply(RDD.scala:927) 在 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$3 5.apply(RDD.scala:927) 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.sc ala:1857) 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.sc ala:1857) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 在 org.apache.spark.scheduler.Task.run(Task.scala:89) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:247) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:624) 在 java.lang.Thread.run(Thread.java:748) 06 年 18 月 10 日 01:10:08 错误 executor.Executor:阶段 0.0 中的任务 0.0 异常(T ID 0) java.lang.NumberFormatException:对于输入字符串:“2007” 在 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) 在 java.lang.Integer.parseInt(Integer.java:580) 在 java.lang.Integer.valueOf(Integer.java:766) 在 com.example.streaming.EventParseUtil.convertData(EventParseUtil.java: 24) 在 com.example.streaming.CarEventsProcessor.lambda$main$91ca40fe$1(CarEventsProcessor.java:76) 在 org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.appl y(JavaPairRDD.scala:1015) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala :30) 在 com.lucidworks.spark.SolrSupport$5.call(SolrSupport.java:216) 在 com.lucidworks.spark.SolrSupport$5.call(SolrSupport.java:210) 在 org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.app ly(JavaRDDLike.scala:225) 在 org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.app ly(JavaRDDLike.scala:225) 在 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$3 5.apply(RDD.scala:927) 在 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$3 5.apply(RDD.scala:927) 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.sc ala:1857) 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.sc ala:1857) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 在 org.apache.spark.scheduler.Task.run(Task.scala:89) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:247) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:624) 在 java.lang.Thread.run(Thread.java:748) 06 年 18 月 10 日 01:10:08 错误 spark.SolrSupport:将批次发送到集合 connectedCar
数据失败,原因是:org.apache.solr.common.SolrException:找不到集合:
已连接汽车数据

我附上完整的代码。

有人怀疑失败可能是什么吗?

public class CarEventsProcessor {

 private CarEventsProcessor() {}

 public static void main(String[] args) throws JsonParseException, JsonMappingException, IOException {
  if (args.length < 4) {
   System.err
    .println("Usage: CarEventsProcessor <brokers> <topics> <zk_url> <index_name>\n" +
     "  <brokers> is a list of one or more Kafka brokers\n" +
     "  <topics> is a list of one or more kafka topics to consume from\n" +
     " <zk_url> zookeeper url\n" +
     " <index_name> name of solr index\n\n");
   System.exit(1);
  }

  String brokers = args[0];
  String topics = args[1];
  String zk_url = args[2];
  String index_name = args[3];

  ObjectMapper objectMapper = new ObjectMapper();
  objectMapper.registerModule(new DefaultScalaModule());

  // Create context with a 2 seconds batch interval
  SparkConf sparkConf = new SparkConf()
   .setAppName("CarEventsProcessor");
  sparkConf.setMaster("local[4]");

  JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(60));
  jssc.sparkContext().setLogLevel("ERROR");

  HashSet < String > topicsSet = new HashSet < String > (Arrays.asList(topics.split(",")));
  HashMap < String, String > kafkaParams = new HashMap < String, String > ();
  kafkaParams.put("metadata.broker.list", brokers);


  // Create direct kafka stream with brokers and topics
  JavaPairInputDStream < String, String > messages = KafkaUtils.createDirectStream(jssc, String.class, String.class,
   StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);


  // Get the messages and extract payload
  JavaDStream < String > events = messages
   .map(new Function < Tuple2 < String, String > , String > () {
    @Override
    public String call(Tuple2 < String, String > tuple2) {
     return tuple2._2();
    }
   });

  //convert to SolrDocuments
  JavaDStream < SolrInputDocument > parsedSolrEvents = events.map(incomingRecord -> EventParseUtil.convertData(incomingRecord));

  //send to solr
  SolrSupport.indexDStreamOfDocs(zk_url, index_name, 10, parsedSolrEvents);

  parsedSolrEvents.print();
  jssc.start();
  jssc.awaitTermination();
 }
}

【问题讨论】:

  • 你能发布完整的错误吗?
  • 一般来说,将数据从 Kafka 流式传输到其他系统(反之亦然)的更好方法是使用 Kafka Connect,它是 Apache Kafka 的一部分。如果这有用,我可以扩展它并在答案中发布完整的详细信息。
  • 嗯。你读过那个错误吗?在底部Collection not found connectedCarData

标签: java apache-spark solr apache-kafka


【解决方案1】:

NumberFormatException: For input string: "2007 "... at com.example.streaming.EventParseUtil.convertData(EventParseUtil.java: 24)

您在包含空格的字符串上调用了Integer.parseInt

您必须将字符串参数修剪为该方法。

在错误的底部,您没有找到一个集合。


一般来说,HDP 推荐的在 Solr 和 Kafka 之间获取这些数据的方法是使用 Nifi

【讨论】:

    猜你喜欢
    • 2019-04-13
    • 2018-06-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-09-18
    • 2019-12-20
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多