【问题标题】:Error when running spark kinesis streaming app in cluster mode (Amazon EMR)在集群模式下运行 spark kinesis 流应用程序时出错 (Amazon EMR)
【发布时间】:2017-06-03 00:42:40
【问题描述】:

我在尝试将 spark kinesis 流应用程序运行到 Amazon EMR 集群(1 个主节点,3 个工作节点)中时遇到了一些严重问题。

我想要实现的是使用 sbt 程序集插件构建一个 FAT jar,以便在主节点上运行 spark-submit 命令。我使用的命令如下:

spark-submit --class MyClass --master yarn --deploy-mode 集群 --executor-memory 1g --executor-cores 2 hdfs://url:port/my.jar

此应用程序接收来自 kinesis 流的传入数据,并基于它向我可以跟踪的 url 执行请求(回发)。我已经通过在 SparkConfig 上将 Master 设置为 local[cores] 来在本地测试我的应用程序。

此外,这里是我的代码和我的构建 sbt。我的项目使用的是 scala 2.11.8 和 sbt 0.13.8

build.sbt

name := "my-app"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming_2.11" % "2.0.0" % "provided",
  "org.apache.spark" % "spark-streaming-kinesis-asl-assembly_2.11" % "2.0.0",
  "org.scalaj" % "scalaj-http_2.11" % "2.3.0"
)

assemblyMergeStrategy in assembly := {
  case PathList("org", "apache", xs @ _*) => MergeStrategy.last
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

我的班级

import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.internal.StaticCredentialsProvider
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.keynetic_digital.model.RawKDLog
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.streaming.{Duration, Milliseconds, StreamingContext}
import utils.HttpRequest

/**
  * Created by franco on 18/01/17.
  */
object MyClass {

  val awsAccessKeyId : String = "xxxxxxxxx"
  val awsSecretKey : String = "xxxxxxxx"
  val kinesisStreamName : String = "xxxxxxxx"
  val kinesisEndpoint : String = "https://kinesis.us-west-2.amazonaws.com"
  val appName : String = "xxxxxxxxx"


  def main(args: Array[String]): Unit = {
    //Set up Amazon Kinesis Client
    val kinesisClient : AmazonKinesisClient = createKinesisClient

    //Get all Kinesis shards
    val shards = kinesisClient.describeStream(kinesisStreamName).getStreamDescription.getShards.size()

    val batchInterval = Milliseconds(5000)

    // Create Spark Streaming Context
    val ssc : StreamingContext = createContext(batchInterval)
    val regionName = RegionUtils.getRegionByEndpoint(kinesisEndpoint).getName

    // Create the Kinesis DStreams
    val kinesisStreams = (0 until shards).map { i =>
      KinesisUtils.createStream(ssc, appName,
        kinesisStreamName,kinesisEndpoint,
        regionName,InitialPositionInStream.LATEST, batchInterval * 2,
        StorageLevel.MEMORY_AND_DISK_2)
    }

    //Unified all Streams
    val stream = ssc.union(kinesisStreams)

    //Get an RDD of Option(KDLog) items
    val jsons = stream.map(bytes => Option(RawKDLog.fromJson(bytes))).filter(_.isDefined)

    jsons.foreachRDD{rdd =>
      rdd.foreach{log =>
        handleLog(log)
      }
    }

    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }

  def createKinesisClient : AmazonKinesisClient = {
    //Set System Properties for Worker
    System.setProperty("aws.accessKeyId", awsAccessKeyId)
    System.setProperty("aws.secretKey", awsSecretKey)

    //Setting AWS Credentials
    val credentials : BasicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId,awsSecretKey)

    //Setting AWS Credential Provider
    val provider : StaticCredentialsProvider = new StaticCredentialsProvider(credentials)

    //Setting Kinesis Client
    val kinesisClient : AmazonKinesisClient = new AmazonKinesisClient(provider)
    kinesisClient.setEndpoint(kinesisEndpoint)

    kinesisClient
  }

  def createContext(batchInterval : Duration) : StreamingContext = {

    // Create Spark Configuration
    val config = new SparkConf().setAppName(appName)

    // Create Spark Streaming Context
    new StreamingContext(config, batchInterval)
  }

  def handleLog(log : Option[RawKDLog]) : Unit = {
    if(log.isDefined){
      postBack(log.get)
    }
  }

  /**
    * Method that handles url postback requests
    */
  private def postBack(log : RawKDLog) = {
    //TODO url queryString replacement & request masking
    val postBackUrl : String = "url where I can track requests by tailing Nginx log"

    HttpRequest(postBackUrl) .asString

  }

}

在集群Master节点提交申请后,出现如下错误。

17/01/18 14:39:26 INFO Client: 
     client token: N/A
     diagnostics: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 74, ip-172-31-42-151.us-west-2.compute.internal): java.lang.NoSuchMethodError: org.apache.spark.storage.BlockManager.get(Lorg/apache/spark/storage/BlockId;)Lscala/Option;
    at org.apache.spark.streaming.kinesis.KinesisBackedBlockRDD.getBlockFromBlockManager$1(KinesisBackedBlockRDD.scala:104)
    at org.apache.spark.streaming.kinesis.KinesisBackedBlockRDD.compute(KinesisBackedBlockRDD.scala:117)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
     ApplicationMaster host: 175.31.43.46
     ApplicationMaster RPC port: 0
     queue: default
     start time: 1484750075922
     final status: FAILED
     tracking URL: http://ip-175-31-46-219.us-west-2.compute.internal:20888/proxy/application_1484568737877_0012/
     user: hadoop
Exception in thread "main" org.apache.spark.SparkException: Application application_1484568737877_0012 finished with failed status
    at org.apache.spark.deploy.yarn.Client.run(Client.scala:1132)
    at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1178)
    at org.apache.spark.deploy.yarn.Client.main(Client.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
17/01/18 14:39:26 INFO ShutdownHookManager: Shutdown hook called

我非常感谢有关此主题的任何指导,因为我是开发和使用 Spark 应用程序的新手。

【问题讨论】:

    标签: apache-spark cluster-computing spark-streaming amazon-emr


    【解决方案1】:

    这个问题的解决方法在这个问题Spark Streaming - Error when reading from Kinesis上已经说明了

    实际上发生的事情是,我已经使用使用 Spark 2.0.2 的 amazon emr 5.2.1 设置了我的集群,并且我捆绑了我的 FAT jar 以使用 spark-streaming-kinesis-asl-assembly_2.11强> 2.0.0。

    因此我的应用程序无法从 Kinesis 读取输入数据

    我切换到 emr 5.0.0,这是使用 Spark 2.0.0 的最新版本,一切都按预期执行

    【讨论】:

      猜你喜欢
      • 2018-09-18
      • 1970-01-01
      • 1970-01-01
      • 2018-10-11
      • 1970-01-01
      • 1970-01-01
      • 2015-09-27
      • 1970-01-01
      • 2016-04-19
      相关资源
      最近更新 更多