【问题标题】:Ignite cache is empty after save?保存后点燃缓存为空?
【发布时间】:2019-05-13 10:44:04
【问题描述】:

我的数据管道如下:Kafka => 执行一些计算 => 将结果对加载到 Ignite cache => 打印出来

 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("MainApplication");
 JavaSparkContext sc = new JavaSparkContext(conf);
 JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.seconds(10));
 JavaIgniteContext<String, Float> igniteContext = new JavaIgniteContext<>(sc, PATH, false);

 JavaDStream<Message> dStream = KafkaUtils.createDirectStream(
         streamingContext,
         LocationStrategies.PreferConsistent(),
         ConsumerStrategies.<String, Message>
                 Subscribe(Collections.singletonList(TOPIC), kafkaParams)
 )
         .map(ConsumerRecord::value);

 JavaPairDStream<String, Message> pairDStream =
         dStream.mapToPair(message -> new Tuple2<>(message.getName(), message));

 JavaPairDStream<String, Float> pairs = pairDStream
         .combineByKey(new CreateCombiner(), new MergeValue(), new MergeCombiners(), new HashPartitioner(10))
         .mapToPair(new ToPairTransformer());

 JavaIgniteRDD<String, Float> myCache = igniteContext.fromCache(new CacheConfiguration<>());

  // I know that we put something here:
  pairDStream.foreachRDD((VoidFunction<JavaPairRDD<String, Float>>) myCache::savePairs);

  // But I can't see anything here:
  myCache.foreach(tuple2 -> System.out.println("In cache: " + tuple2._1() + " = " + tuple2._2()));

  streamingContext.start();
  streamingContext.awaitTermination();
  streamingContext.stop();
  sc.stop();

但是这段代码什么也没打印出来……为什么?

为什么Ignite cachesavePairs 之后还是空的?

这里有什么问题?

提前致谢!

【问题讨论】:

  • 看起来您在开始将任何数据流式传输到其中之前正在循环访问myCache?如果将 foreach 循环放在 sc.stop() 之前,是否会得到相同的结果?

标签: spark-streaming rdd ignite


【解决方案1】:

对我来说,pairDStream.foreachRDD(...) 看起来像是一个惰性操作,至少在您开始流式传输上下文 streamingContext.start() 之前没有任何影响。 另一方面,myCache.foreach(...) 是急切操作,您在实际为空的缓存上执行它。 因此,请尝试在流式上下文启动后放置myCache.foreach(...)。甚至在终止之后。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-03-26
    • 1970-01-01
    • 2017-11-17
    • 2017-07-03
    • 1970-01-01
    • 1970-01-01
    • 2017-10-15
    相关资源
    最近更新 更多