【问题标题】:Kinesis Streaming Example Not Working in Cluster Mode on EMR (EMR 4.3, Spark 1.6)Kinesis Streaming 示例在 EMR(EMR 4.3、Spark 1.6)上的集群模式下不工作
【发布时间】:2016-03-06 15:40:52
【问题描述】:

我尝试在 EMR 4.3 上以集群模式运行 Kinesis Streaming 字数统计示例,但未成功。具体来说,即使我可以访问流的元数据,也没有从 Kinesis 读取任何消息。

相同的代码确实在相同的 EMR 集群上以客户端模式运行(即使用“local[*]”),但是当我尝试在集群模式下执行此操作时,kinesis 接收器的第一个作业卡住了:

我在 Spark UI 的流页面中什么也看不到:

最初我认为这是资源/线程数的问题,但根据配置以及我在 YARN 和 Spark UI 中看到的内容,情况似乎并非如此(请参阅下面的所有相关配置)。

我正在寻找有关应用程序为何无法从 Kinesis 读取数据的任何指示,或者寻找建议的配置或设置更改以使其在集群模式下工作。


配置和设置细节

相关的 Kinesis 流有一个 Shard。

我在 EMR 集群设置中使用如下配置:

[{"classification":"capacity-scheduler",

"properties":{"yarn.scheduler.capacity.resource-calculator":"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"}},

{"classification":"spark","properties":{"maximizeResourceAllocation":"true"}},

{"classification":"spark-defaults","properties":{

"spark.executor.instances":"0",
"spark.dynamicAllocation.enabled":"true"}}]

这就是我的环境设置在 spark ui 中的样子:

我尝试运行的代码:

 val appName =  "ks_"+DateTime.now().toString(formatter);
 val sparkConf = new SparkConf().setAppName(appName)


val sc = new SparkContext(sparkConf);
val batchIntervalInSec = 5
val batchInterval = Seconds(batchIntervalInSec)     

val ssc = new StreamingContext(sc, batchInterval)
ssc.checkpoint("/checkpoint")
 val kinesisClient = new AmazonKinesisClient(credentials)
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size

val numStreams = numShards

val kinesisCheckpointInterval = Seconds(batchIntervalInSec-1) 
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()


 val kinesisStreams = (0 until numStreams).map { i =>
  KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
    InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_ONLY)
}

val unionStreams = ssc.union(kinesisStreams)

val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))


val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

wordCounts.print()
ssc.start()
ssc.awaitTermination();

这是我运行的 spark 命令:

spark-submit --deploy-mode cluster 
--class com.komoona.spark.kmn_spark_scala.KinesisStream 
--master yarn --conf spark.executor.cores=4 
--conf spark.executor.instances=2 
--conf spark.streaming.blockInterval=1000ms 
 --jars /home/hadoop/lib/spark-streaming-kinesis-asl-assembly_2.10-1.6.0.jar,/home/hadoop/lib/amazon-kinesis-client-1.6.1.jar, 
test_app_full.jar

编辑: 我注意到尽管配置了 2 个执行器(如命令行中指定的那样),但在 spark UI 中只显示了一个执行器和驱动程序运行:

这可能是问题的根源吗?有什么想法会导致这种情况吗?

【问题讨论】:

    标签: apache-spark spark-streaming amazon-emr amazon-kinesis


    【解决方案1】:

    我在使用 Spark + Kinesis + EMR 聚合 kinesis 流时遇到了同样的问题(测试了多个版本)......事实证明,即使 Kinesis 库是使用 protobuf-java-2.6.1 显式构建的(由于 KCL 依赖性而需要) , EMR 集群的配置方式使其在实践中仍然使用protobuf-java-2.5.0

    我无法仔细观察以弄清楚为什么会发生这种情况,但我快速而肮脏的解决方案是删除 /usr/lib/spark/jars/protobuf-java-2.5.0.jar 并在同一位置(在主节点上)用我自己的 protobuf-java-2.6.1 替换.我在 s3 中保留一个版本,并有一个引导操作将 aws s3 cp2.6.1 jar 向下移动到 /usr/lib/spark/jars 中的正确位置,然后将以下内容添加到您的 spark-submit(在适当的情况下替换 scala 和 spark 版本):

    --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.1.0,\ com.google.protobuf:protobuf-java-2.6.1

    比我更聪明的人可能会找到比这更正确的解决方案,但我还没有看到删除 protobuf-java-2.5.0 有任何明显的副作用,但这并不意味着它们不存在。

    您可以通过使用 --master local[*] 而不是 --master yarn 运行(在您的 spark master 上)检查这是否与您遇到的问题相同,并在日志中查找以下内容:

    17/01/31 19:24:13 错误 Worker:Worker.run 捕获异常,休眠 1000 毫秒! java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: com.google.protobuf.LazyStringList.getUnmodifiableView()Lcom/google/protobuf/LazyStringList;

    TL;DR -

    在 spark master 上的 /usr/lib/spark/jars/ 中将 protobuf-java-2.5.0.jar 替换为 protobuf-java-2.6.1.jar

    添加(替换 scala 和 spark 版本)

    --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.1.0,\ com.google.protobuf:protobuf-java-2.6.1 到你的 spark-submit 命令行

    【讨论】:

      【解决方案2】:

      这对我有用。你有机会看看吗-

      Spark not able to fetch events from Amazon Kinesis

      TL;DR

      有 2 个版本的 foreachRDD 可用

      unionStreams.foreachRDD
      unionStreams.foreachRDD ((rdd:RDD[Array[Byte]], time: Time)
      

      由于某种原因,第一个无法让我得到结果,但是 更改为第二个可以按预期获取我的结果。还没有 探究原因。

      【讨论】:

        【解决方案3】:

        您的配置中有 0 个 spark 执行器 - 我相信您必须增加它。此外,查看日志是否有任何错误。

        【讨论】:

        • 我现在看到你在命令行中指定了执行者的数量作为参数,所以忽略我建议的那部分。
        【解决方案4】:

        在头疼了几天之后,我再次检查了我的 kinesis 配置并将聚合模式更改为关闭,它解决了整个问题。 具有聚合模式的 kinesis 在本地为我工作, 但不是在集群模式下的 emr 中

        【讨论】:

        • 你是如何改变聚合模式的?
        • 如果你使用logstash(就像我们一样),你可以在logstach配置中改变它,基本上这可以解决这个问题:issues.apache.org/jira/browse/SPARK-14421。但是在大规模使用 spark 和 kinesis 几个月后(每小时 100gb),我的建议是远离 kinesis,它在小规模上工作,但是一旦你开始用许多原木生产者加载它,我们就会遇到很多延迟问题原木到达时间和很多原木不止一次到达
        • 感谢您对 Kinesis 的提醒。尽管在任何配置选项上都找不到任何文档来更改此聚合模式。你有类似的链接吗?我最终回滚到它工作的 Spark 1.4.1,但只给了我实际的有效负载,但没有元数据。
        猜你喜欢
        • 2018-09-27
        • 2018-01-12
        • 2017-06-03
        • 1970-01-01
        • 2016-02-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多