【发布时间】:2016-03-06 15:40:52
【问题描述】:
我尝试在 EMR 4.3 上以集群模式运行 Kinesis Streaming 字数统计示例,但未成功。具体来说,即使我可以访问流的元数据,也没有从 Kinesis 读取任何消息。
相同的代码确实在相同的 EMR 集群上以客户端模式运行(即使用“local[*]”),但是当我尝试在集群模式下执行此操作时,kinesis 接收器的第一个作业卡住了:
最初我认为这是资源/线程数的问题,但根据配置以及我在 YARN 和 Spark UI 中看到的内容,情况似乎并非如此(请参阅下面的所有相关配置)。
我正在寻找有关应用程序为何无法从 Kinesis 读取数据的任何指示,或者寻找建议的配置或设置更改以使其在集群模式下工作。
配置和设置细节
相关的 Kinesis 流有一个 Shard。
我在 EMR 集群设置中使用如下配置:
[{"classification":"capacity-scheduler",
"properties":{"yarn.scheduler.capacity.resource-calculator":"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"}},
{"classification":"spark","properties":{"maximizeResourceAllocation":"true"}},
{"classification":"spark-defaults","properties":{
"spark.executor.instances":"0",
"spark.dynamicAllocation.enabled":"true"}}]
我尝试运行的代码:
val appName = "ks_"+DateTime.now().toString(formatter);
val sparkConf = new SparkConf().setAppName(appName)
val sc = new SparkContext(sparkConf);
val batchIntervalInSec = 5
val batchInterval = Seconds(batchIntervalInSec)
val ssc = new StreamingContext(sc, batchInterval)
ssc.checkpoint("/checkpoint")
val kinesisClient = new AmazonKinesisClient(credentials)
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
val numStreams = numShards
val kinesisCheckpointInterval = Seconds(batchIntervalInSec-1)
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
val kinesisStreams = (0 until numStreams).map { i =>
KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_ONLY)
}
val unionStreams = ssc.union(kinesisStreams)
val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination();
这是我运行的 spark 命令:
spark-submit --deploy-mode cluster
--class com.komoona.spark.kmn_spark_scala.KinesisStream
--master yarn --conf spark.executor.cores=4
--conf spark.executor.instances=2
--conf spark.streaming.blockInterval=1000ms
--jars /home/hadoop/lib/spark-streaming-kinesis-asl-assembly_2.10-1.6.0.jar,/home/hadoop/lib/amazon-kinesis-client-1.6.1.jar,
test_app_full.jar
编辑: 我注意到尽管配置了 2 个执行器(如命令行中指定的那样),但在 spark UI 中只显示了一个执行器和驱动程序运行:
这可能是问题的根源吗?有什么想法会导致这种情况吗?
【问题讨论】:
标签: apache-spark spark-streaming amazon-emr amazon-kinesis