【发布时间】:2015-11-27 17:42:55
【问题描述】:
tldr; 无法使用 Kinesis Spark Streaming 集成,因为它不接收数据。
- 测试流设置完毕,nodejs 应用每秒发送 1 条简单记录。
- 标准 Spark 1.5.2 集群设置有主节点和工作节点(4 核),环境中带有 docker-compose、AWS 凭证
-
spark-streaming-kinesis-asl-assembly_2.10-1.5.2.jar已下载并添加到类路径中 -
job.py或job.jar(仅阅读和打印)已提交。 - 似乎一切正常,但没有收到任何记录。
KCL Worker 线程有时会说“Sleeping ...”——它可能会被静默地破坏(我检查了所有能找到的 stderr,但没有任何提示)。也许吞下了 OutOfMemoryError ......但我怀疑这一点,因为每秒 1 条记录的数量。
------------------------------------------ 时间:1448645109000 毫秒 ------------------------------------------ 27 年 15 月 11 日 17:25:09 信息 JobScheduler:已完成作业流作业 1448645109000 ms.0 来自作业集的时间 1448645109000 ms 27 年 15 月 11 日 17:25:09 信息 KinesisBackedBlockRDD:从持久性列表中删除 RDD 102 15/11/27 17:25:09 INFO JobScheduler:总延迟:0.002 秒,时间 1448645109000 毫秒(执行:0.001 秒) 27 年 15 月 11 日 17:25:09 信息块管理器:删除 RDD 102 27 年 15 月 11 日 17:25:09 信息 KinesisInputDStream:在 NewClass.java:25 的时间 1448645109000 毫秒的 createStream 处删除 RDD KinesisBackedBlockRDD[102] 块 15/11/27 17:25:09 INFO ReceivedBlockTracker: 删除批次 ArrayBuffer(1448645107000 ms) 15/11/27 17:25:09 INFO InputInfoTracker:删除旧批次元数据:1448645107000 ms 27 年 15 月 11 日 17:25:10 信息 JobScheduler:为时间 1448645110000 毫秒添加了作业 27 年 15 月 11 日 17:25:10 信息 JobScheduler:从时间 1448645110000 毫秒的作业集开始作业流作业 1448645110000 ms.0 ------------------------------------------ 时间:1448645110000 毫秒 ------------------------------------------请让我知道任何提示,我真的很想使用 Spark 进行实时分析......除了这个不接收数据的小细节 :) 似乎没问题。
PS:我觉得奇怪的是 Spark 忽略了我的存储级别(内存和磁盘 2)和检查点间隔(20,000 毫秒)的设置
27 年 15 月 11 日 17:23:26 信息 KinesisInputDStream:metadataCleanupDelay = -1 27 年 15 月 11 日 17:23:26 信息 KinesisInputDStream:滑动时间 = 1000 毫秒 27 年 15 月 11 日 17:23:26 信息 KinesisInputDStream:存储级别 = StorageLevel(假,假,假,假,1) 27 年 15 月 11 日 17:23:26 信息 KinesisInputDStream:检查点间隔 = null 27 年 15 月 11 日 17:23:26 信息 KinesisInputDStream:记住持续时间 = 1000 毫秒 15/11/27 17:23:26 INFO KinesisInputDStream:初始化和验证 org.apache.spark.streaming.kinesis.KinesisInputDStream@74b21a6源代码(java):
公共类新类{ 公共静态无效主要(字符串[]参数){ SparkConf conf = new SparkConf().setAppName("appname").setMaster("local[3]"); JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000)); JavaReceiverInputDStream kinesisStream = KinesisUtils.createStream( ssc,“webassist-test”,“测试”,“https://kinesis.us-west-1.amazonaws.com”,“us-west-1”, InitialPositionInStream.LATEST, 新的持续时间(20000), StorageLevel.MEMORY_AND_DISK_2() ); kinesisStream.print(); ssc.start(); ssc.awaitTermination(); } }Python 代码(之前尝试过 pprinting 和发送到 MongoDB):
从 pyspark.streaming.kinesis 导入 KinesisUtils,InitialPositionInStream 从 pyspark 导入 SparkContext,StorageLevel 从 pyspark.streaming 导入 StreamingContext 从系统导入 argv sc = SparkContext(appName="webassist-test") ssc = StreamingContext(sc, 5) 流 = KinesisUtils.createStream(ssc, "应用名称", “测试”, "https://kinesis.us-west-1.amazonaws.com", “我们-西-1”, InitialPositionInStream.LATEST, 5、 StorageLevel.MEMORY_AND_DISK_2) 流.pprint() ssc.start() ssc.awaitTermination()注意:我还尝试使用 stream.foreachRDD(lambda rdd: rdd.foreachPartition(send_partition)) 将数据发送到 MongoDB,但没有将其粘贴到此处,因为您需要一个 MongoDB 实例,并且它与问题无关 - 输入中已经没有记录。
还有一件事——KCL 从不提交。对应的 DynamoDB 如下所示:
租用密钥检查点租用计数器租用所有者所有者SwitchesSinceCheckpoint shardId-000000000000 最新的 614 本地主机:d92516 ... 8用于提交的命令:
spark-submit --executor-memory 1024m --master spark://IpAddress:7077 /path/test.py
在 MasterUI 中我可以看到:
Input Rate
Receivers: 1 / 1 active
Avg: 0.00 events/sec
KinesisReceiver-0
Avg: 0.00 events/sec
...
Completed Batches (last 76 out of 76)
感谢您的帮助!
【问题讨论】:
-
你能贴一下Stream上下文创建、DStream转换和打印的源代码吗?我们可以看看。
-
也有同样的问题。
-
你解决了吗?
标签: apache-spark spark-streaming amazon-kinesis