【问题标题】:java.util.concurrent.TimeoutException error in spark Databricksspark Databricks 中的 java.util.concurrent.TimeoutException 错误
【发布时间】:2022-01-19 20:19:26
【问题描述】:

我使用以下自定义接收器在 Spark-Scala 中使用来自 Rabbitmq 的数据。下面是我的代码。

def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
   def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
     
     var batchInterval = Seconds(20)
              var ssc = new StreamingContext(sc, batchInterval)
      val host = ""
                val port = ""
                val queueName = ""
                val vHost = ""
                val userName = ""
                val password = ""
val maxMessagesPerPartition = "1000"
                val maxReceiveTime = "0.9"
      
        val receiverStream = RabbitMQUtils.createStream(ssc, Map(
      "host" -> "host",
      "port" -> "port",
      "queueName" -> "queueName",
      "vHost" -> "vHost",
      "userName" -> "userName",
      "password" -> "password", 
"maxMessagesPerPartition" -> "maxMessagesPerPartition",
 "maxReceiveTime"   -> "maxReceiveTime"
          ))
     
       val lines = ssc.receiverStream(new CustomReceiver(host, port.toInt))
    lines.foreachRDD(rdd =>{ val df=rdd.toDF
                        
 
  import sqlContext.implicits._
 
  df.write.format("parquet").mode("append").save("path")
  
})

      lines.print()
      ssc.start() 
      ssc.awaitTermination()

我收到以下超时错误。

java.util.concurrent.TimeoutException
at org.apache.spark.util.ThreadUtils$.runInNewThreadWithTimeout(ThreadUtils.scala:351)
    at org.apache.spark.util.ThreadUtils$.runInNewThread(ThreadUtils.scala:283)
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:585)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:577)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1212830188116081:87)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-1212830188116081:188)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw$$iw.<init>(command-1212830188116081:190)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw.<init>(command-1212830188116081:192)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw.<init>(command-1212830188116081:194)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw.<init>(command-1212830188116081:196)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read.<init>(command-1212830188116081:198)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$.<init>(command-1212830188116081:202)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$.<clinit>(command-1212830188116081)
    at lined28d5369d60244b0a66d1d87a30c93a027.$eval$.$print$lzycompute(<notebook>:7)
    at lined28d5369d60244b0a66d1d87a30c93a027.$eval$.$print(<notebook>:6)
    at lined28d5369d60244b0a66d1d87a30c93a027.$eval.$print(<notebook>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:745)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1021)
    at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:574)
    at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:41)
    at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:37)
     
    

这个错误是否意味着当前的 spark 集群配置无法处理传入的消息负载。是否与任何内存问题有关。有人可以帮忙。

【问题讨论】:

    标签: scala apache-spark rabbitmq databricks azure-databricks


    【解决方案1】:

    我建议将spark.driver.memory 增加到更高的值。

    同时尝试增加broadcastTimeout

    请参考 T. Gawęda 的 answer

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-11-09
      • 2022-07-16
      • 2023-03-25
      • 2020-11-04
      • 1970-01-01
      相关资源
      最近更新 更多