【问题标题】:spark 2.0 parallel JobProgressListener fails miserablyspark 2.0 并行 JobProgressListener 惨遭失败
【发布时间】:2017-01-28 13:40:01
【问题描述】:

我有一个场景,我需要使用 for 循环并行触发许多 sql 查询并将结果列表收集到 ListBuffer 中。 但是,我在运行循环时遇到了很多错误,并且结果不完整。为了举例说明,我做了一个非常简单的可重现示例:

import scala.collection.mutable.ListBuffer
val dummy = List("a","b").toDF.createOrReplaceTempView("df")
spark.catalog.cacheTable("df")
val dig = (0 to 9).par
var counter = 0:Int
var results = ListBuffer[List[org.apache.spark.sql.Row]]()

for (i1 <- dig ) {
   for (i2 <- dig ) {
     for (i3 <- dig ) {
        println("||==="+i1+"=="+i2+"=="+i3+"===="+(i1*100+i2*10+i3*1)+"===="+counter+"=======||")
        counter +=1
        results += spark.sql("select 'trial','"+i1+"','"+i2+"','"+i3+"','"+(i1*100+i2*10+i3*1)+"','"+counter+"',*  from df ").collect().toList
       }
     }
   }
results(0).take(2).foreach(println)
results.size
results.flatten.size

上面的代码只是从 0 到 999 计数,每个计数将 2 行的列表插入到 ListBuffer 中。表以及用于比较的“串行”计数器值

运行代码结果:

||===9==8==3====983====969=======||
||===9==8==5====985====969=======||
||===9==8==1====981====969=======||
||===9==8==2====982====969=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 784
||===9==8==7====987====974=======||
||===5==8==9====589====975=======||
||===9==8==4====984====976=======||
||===9==8==6====986====976=======||
||===9==8==9====989====977=======||
||===9==8==8====988====977=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 773
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 790
||===5==9==0====590====980=======||
||===5==9==2====592====980=======||
||===5==9==5====595====980=======||
||===5==9==1====591====980=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 795
||===5==9==3====593====984=======||
||===5==9==7====597====985=======||
||===5==9==8====598====985=======||
||===5==9==6====596====987=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 798
||===5==9==9====599====988=======||
||===5==9==4====594====989=======||
||===9==9==0====990====990=======||
||===9==9==5====995====991=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 784
||===9==9==2====992====992=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 789
||===9==9==3====993====993=======||
||===9==9==1====991====994=======||
||===9==9==4====994====995=======||
||===9==9==7====997====996=======||
||===9==9==8====998====997=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 790
||===9==9==6====996====998=======||
||===9==9==9====999====999=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 805
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 798

scala> results(0).take(2).foreach(println)
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 802
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 805
[trial,0,0,0,0,16,a]
[trial,0,0,0,0,16,b]

scala> results.size
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 839
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 840
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 839
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 842
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 855
res3: Int = 1000

scala> results.flatten.size
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 860
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 854
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 860
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 868
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 874
res4: Int = 2000
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 882

scala> 

[Stage 589:=(28 + 0) / 28][Stage 590:>(27 + 1) / 28][Stage 591:>(20 + 7) / 28]16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 888
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 895
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 898
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 898
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 905
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 906
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 907
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 902
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 905
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 913
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 915
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 916
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 913
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 920
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 942
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 946
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 942
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 946
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 948
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 956
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 952
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 965
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 965
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 966
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 976
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 976
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 990
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 999


scala> 

这些只是我得到的一些警告。

您可以看到计数器有时会“错开”

**这是麻烦开始的地方**

大量警告,但 results.size=1000results.flatten.size = 2000 符合预期。

但是尝试以相同的方式计数到 10000 会导致更多警告:

import scala.collection.mutable.ListBuffer
val dummy = List("a","b").toDF.createOrReplaceTempView("df")
spark.catalog.cacheTable("df")
val dig = (0 to 9).par
var counter = 0:Int
var results = ListBuffer[List[org.apache.spark.sql.Row]]()

for (i1 <- dig ) {
   for (i2 <- dig ) {
     for (i3 <- dig ) {
       for (i4 <- dig ) {
         println("||==="+i1+"=="+i2+"=="+i3+"=="+i4+"===="+(i1*1000+i2*100+i3*10+i4*1)+"===="+counter+"=======||")
         counter +=1
         results += spark.sql("select 'trial','"+i1+"','"+i2+"','"+i3+"', '"+i4+"','"+(i1*1000+i2*100+i3*10+i4*1)+"','"+counter+"',*  from df ").collect().toList
       }
     }
   }
 }
results(0).take(2).foreach(println)
results.size
results.flatten.size

输出:

16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8797
||===0==9==4==3====943====9998=======||
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8799
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8801
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8802
||===0==9==4==4====944====9999=======||
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8803
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8804
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8805
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8806

结果:

scala> results(0).take(2).foreach(println)
[trial,3,0,0,0,3000,7,a]
[trial,3,0,0,0,3000,7,b]

scala> results.size
res3: Int = 9999

scala> results.flatten.size
res4: Int = 19998

缺少一个值。

我邀请您尝试以下数到 100000 的代码:

import scala.collection.mutable.ListBuffer
val dummy = List("a","b").toDF.createOrReplaceTempView("df")
spark.catalog.cacheTable("df")
val dig = (0 to 9).par
var counter = 0:Int
var results = ListBuffer[List[org.apache.spark.sql.Row]]()

for (i0 <- dig ) {
  for (i1 <- dig ) {
    for (i2 <- dig ) {
      for (i3 <- dig ) {
        for (i4 <- dig ) {
          println("============="+i0+"=="+i1+"=="+i2+"=="+i3+"=="+i4+"===="+(i0*10000+i1*1000+i2*100+i3*10+i4*1)+"===="+counter+"=========") 
          counter +=1
          results += spark.sql("select 'trial','"+i0+"','"+i1+"','"+i2+"','"+i3+"', '"+i4+"','"+(i0*10000+i1*1000+i2*100+i3*10+i4*1)+"','"+counter+"',*  from df ").collect().toList
        }
      }
    }
  }
}

我不仅在运行期间收到大量的 JobProgressListener 警告,而且结果不完整且不确定:

scala> results(0).take(2).foreach(println)
[trial,8,5,0,0,0,85000,13,a]
[trial,8,5,0,0,0,85000,13,b]

scala> results.size
res3: Int = 99999

scala> results.flatten.size
res4: Int = 192908

在我的现实生活示例中,我经常在运行的随机点收到“spark.sql.execution.id is already set”异常

我该如何解决这个问题?

我试过了

spark.conf.set("spark.extraListeners","org.apache.spark.scheduler.StatsReportListener,org.apache.spark.scheduler.EventLoggingListener")

并阅读Spark 1.6: java.lang.IllegalArgumentException: spark.sql.execution.id is already set

Apache Spark: network errors between executors

http://docs.scala-lang.org/overviews/parallel-collections/overview.html关于副作用操作,但似乎方向太多了。

似乎与这个问题最相关的错误是https://issues.apache.org/jira/browse/SPARK-10548 这应该在 spark 1.6 中解决

谁能提供一些关于解决这种情况的提示?我的真实世界案例的复杂性类似于 100000 计数,并且在随机阶段执行时失败。

我部署了一个 GCS dataproc 集群

gcloud dataproc clusters create clusTest --zone us-central1-b --master-machine-type n1-highmem-16 --num-workers 2 --worker-machine-type n1-highmem-8 --num-worker-local-ssds 2 --num-preemptible-workers 8 --scopes 'https://www.googleapis.com/auth/cloud-platform' --project xyz-analytics

【问题讨论】:

  • 你使用的是什么版本的 Spark?
  • 谷歌 gcs 上的 spark 2.0
  • java.lang.IllegalArgumentException: spark.sql.execution.id 已经设置在 org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:81) 在 org.apache .spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532) 在 org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182) 在 org.apache .spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2187) at ...
  • 对我来说看起来像一个错误。显然,其他一些人也报告在 2.0 (issues.apache.org/jira/browse/SPARK-13747) 中重新看到此问题,即使它被标记为已解决。将不得不挖掘代码以了解根本原因。
  • 是的,我已经看到了这个错误并希望它与此相关。

标签: scala apache-spark parallel-processing apache-spark-sql scala-collections


【解决方案1】:

结果不完整且不确定

非确定性部分应该给出提示。在将结果添加到 ListBuffer 时,您会陷入竞争状态(并行更新并不是真正的线程安全,因此如果运行时间足够长,您最终会丢失一些结果。)

我在本地尝试过,可以重现这个不完整的结果问题。只需添加一个同步块以附加到 Buffer 即可完成结果。您还可以为您的工作使用其他 synchronized 数据结构,因此您无需放置明确的 synchronized 块,例如java.util.concurrent.ConcurrentLinkedQueue 什么的。

所以下面解决了这个问题:

for (i1 <- dig ) {
   for (i2 <- dig ) {
     for (i3 <- dig ) {
       for (i4 <- dig ) {
         counter +=1
         val result = spark.sql("select 'trial','"+i1+"','"+i2+"','"+i3+"', '"+i4+"','"+(i1*1000+i2*100+i3*10+i4*1)+"','"+counter+"',*  from df ").collect().toList
         synchronized {
           results += result
         }
       }
     }
   }
 }

至于“spark.sql.execution.id is already set”异常:我无法用上面给出的示例重现它。 (但是,我在本地 Spark 上运行上述代码。)它是否可以在本地设置上重现?

【讨论】:

    猜你喜欢
    • 2018-04-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-05-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多