【问题标题】:Java Spark RDD in an other RDD?另一个RDD中的Java Spark RDD?
【发布时间】:2016-12-23 16:39:03
【问题描述】:

我尝试创建一个 JavaRDD,其中包含其他系列的 RDD。

RDDMachine.foreach(机器 -> startDetectionNow()) 在内部,机器开始查询 ES 并获取另一个 RDD。我收集所有这些(1200 次点击)并隐藏到列表中。机器开始使用此列表后

首先:是否可以这样做?如果没有,我可以通过哪些方式尝试做一些不同的事情?

让我展示一下我的尝试:

        SparkConf conf = new SparkConf().setAppName("Algo").setMaster("local");
    conf.set("es.index.auto.create", "true");
    conf.set("es.nodes", "IP_ES");
    conf.set("es.port", "9200");
    sparkContext = new JavaSparkContext(conf);

    MyAlgoConfig config_algo = new MyAlgoConfig(Detection.byPrevisionMerge);

    Machine m1 = new Machine("AL-27", "IP1", config_algo);
    Machine m2 = new Machine("AL-20", "IP2", config_algo);
    Machine m3 = new Machine("AL-24", "IP3", config_algo);
    Machine m4 = new Machine("AL-21", "IP4", config_algo);

    ArrayList<Machine> Machines = new ArrayList();
    Machines.add(m1);
    Machines.add(m2);
    Machines.add(m3);
    Machines.add(m4);

    JavaRDD<Machine> machineRDD = sparkContext.parallelize(Machines);

    machineRDD.foreach(machine -> machine.startDetectNow());

我尝试在每台必须从 Elasticsearch 中的数据中学习的机器上启动我的算法。


    public boolean startDetectNow()


    // MEGA Requete ELK
    JavaRDD dataForLearn = Elastic.loadElasticsearch(
            Algo.sparkContext
            , "logstash-*/Collector"
            , Elastic.req_AvgOfCall(
                    getIP()
                    , "hour"
                    , "2016-04-16T00:00:00"
                    , "2016-06-10T00:00:00"));

    JavaRDD<Hit> RDD_hits = Elastic.mapToHit(dataForLearn);
    List<Hit> hits = Elastic.RddToListHits(RDD_hits);

所以我尝试在每个“机器”中获取查询的所有数据。 我的问题是:Spark 可以做到这一点吗?或者也许以其他方式? 当我在 Spark 中启动它时;当代码在第二个 RDD 附近时,接缝就像锁一样。

错误信息是:

16/08/17 00:17:13 信息 SparkContext:开始工作:在 Elastic.java:94 收集 16/08/17 00:17:13 INFO DAGScheduler:得到工作 1(在 Elastic.java:94 收集),具有 1 个输出分区 16/08/17 00:17:13 INFO DAGScheduler:最后阶段:ResultStage 1(在 Elastic.java:94 收集) 16/08/17 00:17:13 INFO DAGScheduler: 最后阶段的父母: List() 2017 年 16 月 8 日 00:17:13 信息 DAGScheduler:失踪父母:列表() 16/08/17 00:17:13 INFO DAGScheduler: 提交 ResultStage 1 (MapPartitionsRDD[4] at map at Elastic.java:106),没有丢失的父母 16/08/17 00:17:13 INFO MemoryStore: 块广播_1 作为值存储在内存中(估计大小 4.3 KB,空闲 7.7 KB) 16/08/17 00:17:13 INFO MemoryStore:块 broadcast_1_piece0 以字节形式存储在内存中(估计大小 2.5 KB,空闲 10.2 KB) 16/08/17 00:17:13 INFO BlockManagerInfo:在 localhost:46356 的内存中添加了 broadcast_1_piece0(大小:2.5 KB,免费:511.1 MB) 16/08/17 00:17:13 INFO SparkContext:从 DAGScheduler.scala:1006 的广播创建广播 1 16/08/17 00:17:13 INFO DAGScheduler:从 ResultStage 1 提交 1 个缺失的任务(MapPartitionsRDD[4] 在 Elastic.java:106 的地图上) 16/08/17 00:17:13 INFO TaskSchedulerImpl:添加任务集 1.0 和 1 个任务 ^C16/08/17 00:17:22 信息 SparkContext:从关闭挂钩调用 stop() 16/08/17 00:17:22 信息 SparkUI:在 http://192.168.10.23:4040 停止 Spark Web UI 17 年 8 月 16 日 00:17:22 信息 DAGScheduler:ResultStage 0(GuardConnect.java:60 处的 foreach)在 10,292 秒内失败 17 年 8 月 16 日 00:17:22 信息 DAGScheduler:作业 0 失败:GuardConnect.java:60 的 foreach,耗时 10,470974 秒 线程“主”org.apache.spark.SparkException 中的异常:作业 0 已取消,因为 SparkContext 已关闭 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:804) 在 scala.collection.mutable.HashSet.foreach(HashSet.scala:79) 在 org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:804) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658) 在 org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) 在 org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581) 在 org.apache.spark.SparkContext$$anonfun$stop$9.apply$mcV$sp(SparkContext.scala:1740) 在 org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229) 在 org.apache.spark.SparkContext.stop(SparkContext.scala:1739) 在 org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:596) 在 org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:267) 在 org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:239) 在 org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:239) 在 org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:239) 在 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765) 在 org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:239) 在 org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:239) 在 org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:239) 在 scala.util.Try$.apply(Try.scala:161) 在 org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:239) 在 org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:218) 在 org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54) 在 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 在 org.apache.spark.rdd.RDD.foreach(RDD.scala:910) 在 org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:332) 在 org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:46) 在 com.seigneurin.spark.GuardConnect.main(GuardConnect.java:60) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 在 org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 16/08/17 00:17:22 错误 LiveListenerBus:SparkListenerBus 已经停止!删除事件 SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@4a7e0846) 16/08/17 00:17:22 INFO DAGScheduler:ResultStage 1(在 Elastic.java:94 收集)在 9,301 秒内失败 16/08/17 00:17:22 错误 LiveListenerBus:SparkListenerBus 已经停止!删除事件 SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@6c6b4cb8) 16/08/17 00:17:22 错误 LiveListenerBus:SparkListenerBus 已经停止!丢弃事件 SparkListenerJobEnd(0,1471385842813,JobFailed(org.apache.spark.SparkException: Job 0 由于 SparkContext 被关闭而取消)) 17 年 8 月 16 日 00:17:22 信息 DAGScheduler:作业 1 失败:在 Elastic.java:94 处收集,耗时 9,317650 秒 16/08/17 00:17:22 错误执行程序:阶段 0.0 中的任务 0.0 异常(TID 0) org.apache.spark.SparkException:作业 1 取消,因为 SparkContext 已关闭 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:804) 在 scala.collection.mutable.HashSet.foreach(HashSet.scala:79) 在 org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:804) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658) 在 org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) 在 org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581) 在 org.apache.spark.SparkContext$$anonfun$stop$9.apply$mcV$sp(SparkContext.scala:1740) 在 org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229) 在 org.apache.spark.SparkContext.stop(SparkContext.scala:1739) 在 org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:596) 在 org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:267) 在 org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:239) 在 org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:239) 在 org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:239) 在 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765) 在 org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:239) 在 org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:239) 在 org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:239) 在 scala.util.Try$.apply(Try.scala:161) 在 org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:239) 在 org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:218) 在 org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54) 在 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 在 org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 在 org.apache.spark.rdd.RDD.collect(RDD.scala:926) 在 org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:339) 在 org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:46) 在 com.seigneurin.spark.Elastic.RddToListHits(Elastic.java:94) 在 com.seigneurin.spark.OXO.prepareDataAndLearn(OXO.java:126) 在 com.seigneurin.spark.OXO.startDetectNow(OXO.java:148) 在 com.seigneurin.spark.GuardConnect.lambda$main$1282d8df$1(GuardConnect.java:60) 在 org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:332) 在 org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:332) 在 scala.collection.Iterator$class.foreach(Ite​​rator.scala:727) 在 org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912) 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 在 org.apache.spark.scheduler.Task.run(Task.scala:89) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745) 16/08/17 00:17:22 错误 LiveListenerBus:SparkListenerBus 已经停止!删除事件 SparkListenerJobEnd(1,1471385842814,JobFailed(org.apache.spark.SparkException: Job 1 由于 SparkContext 已关闭而取消)) 16/08/17 00:17:22 信息 MapOutputTrackerMasterEndpoint:MapOutputTrackerMasterEndpoint 停止! 16/08/17 00:17:22 信息 MemoryStore:MemoryStore 已清除 16/08/17 00:17:22 信息 BlockManager:BlockManager 停止 16/08/17 00:17:22 信息 BlockManagerMaster:BlockManagerMaster 停止 16/08/17 00:17:22 信息 OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:OutputCommitCoordinator 停止! 2017 年 16 月 8 日 00:17:22 信息 RemoteActorRefProvider$RemotingTerminator:关闭远程守护程序。 16/08/17 00:17:22 INFO RemoteActorRefProvider$RemotingTerminator:远程守护进程关闭;继续冲洗远程传输。 16/08/17 00:17:22 INFO TaskSetManager:在阶段 1.0 启动任务 0.0(TID 1,本地主机,分区 0,ANY,6751 字节) 16/08/17 00:17:22 错误收件箱:忽略错误 java.util.concurrent.RejectedExecutionException:任务 org.apache.spark.executor.Executor$TaskRunner@65fd4104 从 java.util.concurrent.ThreadPoolExecutor@4387a1bf 被拒绝 [已终止,池大小 = 0,活动线程 = 0,排队任务 = 0 , 完成的任务 = 1] 在 java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 在 java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) 在 java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) 在 org.apache.spark.executor.Executor.launchTask(Executor.scala:128) 在 org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$reviveOffers$1.apply(LocalBackend.scala:86) 在 org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$reviveOffers$1.apply(LocalBackend.scala:84) 在 scala.collection.immutable.List.foreach(List.scala:318) 在 org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalBackend.scala:84) 在 org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:69) 在 org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116) 在 org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) 在 org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) 在 org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745) 16/08/17 00:17:22 INFO SparkContext:成功停止 SparkContext 16/08/17 00:17:22 信息 ShutdownHookManager:已调用关闭挂钩 16/08/17 00:17:22 INFO ShutdownHookManager:删除目录 /tmp/spark-8bf65e78-a916-4cc0-b4d1-1b0ec9a07157 2017 年 8 月 16 日 00:17:22 信息 RemoteActorRefProvider$RemotingTerminator:远程关闭。 16/08/17 00:17:22 INFO ShutdownHookManager:删除目录 /tmp/spark-8bf65e78-a916-4cc0-b4d1-1b0ec9a07157/httpd-6d3aeb80-808c-4749-8f8b-ac9341f990a7

谢谢你能给我一些建议。

【问题讨论】:

  • 我们需要内部异常来提供帮助。这一切都表明您的foreach 存在问题。
  • 嗯,也许是因为我在 Rdd 之后有一段时间(1)?我在想我也许可以用 RDD 线程化工作。我添加了错误的完整消息
  • 一个 RDD 的 RDD 并没有什么意义,但是是的,有一种方法可以欺骗编译器进行编译。

标签: java elasticsearch apache-spark dataset rdd


【解决方案1】:

您不能在 RDD 中创建 RDD,无论 RDD 的类型是什么。 这是第一条规则。这是因为 RDD 是指向您的数据的抽象。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-08-19
    • 1970-01-01
    • 1970-01-01
    • 2016-03-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多