【问题标题】:Apache Spark Kinesis Sample not workingApache Spark Kinesis 示例不工作
【发布时间】:2014-11-15 02:27:40
【问题描述】:

我正在尝试运行 JavaKinesisWordCountASL 示例。

该示例似乎连接到我的 Kinesis Stream 并从流中获取数据(如下面的日志所示)。但是,Sparks 不会调用示例中传递给 unionStreams.flatMap 方法的调用函数,也不会打印任何字数。

我尝试过使用 Java 8 和 Java 7 运行。我在 ubuntu 实例上运行它。同样的例子也适用于我的 macbook。

14/11/15 01:59:42 INFO scheduler.ReceiverTracker:流 1 收到 0 个块 14/11/15 01:59:42 INFO storage.MemoryStore: ensureFreeSpace(264) 调用 curMem=3512, maxMem=938244833 14/11/15 01:59:42 INFO storage.MemoryStore: 块 input-0-1416016781800 作为值存储在内存中(估计大小 264.0 B,可用 894.8 MB) 2015 年 14 月 11 日 01:59:42 INFO storage.BlockManagerInfo:在 ip-10-80-91-13.ec2.internal:39149 的内存中添加了 input-0-1416016781800(大小:264.0 B,免费:894.8 MB) 2015 年 14 月 11 日 01:59:42 INFO storage.BlockManagerMaster:更新了块 input-0-1416016781800 的信息 14/11/15 01:59:42 INFO scheduler.JobScheduler:为时间 1416016782000 毫秒添加作业 2015 年 14 月 11 日 01:59:42 INFO network.SendingConnection:启动到 [ip-10-80-91-13.ec2.internal/10.80.91.13:39149] 的连接 14/11/15 01:59:42 INFO network.SendingConnection:连接到 [ip-10-80-91-13.ec2.internal/10.80.91.13:39149],1 条消息待处理 14/11/15 01:59:42 INFO network.ConnectionManager: 接受来自 [ip-10-80-91-13.ec2.internal/10.80.91.13:56700] 的连接 14/11/15 01:59:42 WARN storage.BlockManager: Block input-0-1416016781800 在这台机器上已经存在;不重新添加 2015 年 14 月 11 日 01:59:42 信息接收器。块生成器:推送块输入-0-1416016781800 14/11/15 01:59:43 INFO storage.MemoryStore: ensureFreeSpace(256) 调用 curMem=3776, maxMem=938244833 14/11/15 01:59:43 INFO storage.MemoryStore: 块 input-0-1416016782800 作为值存储在内存中(估计大小 256.0 B,可用 894.8 MB) 14/11/15 01:59:43 INFO storage.BlockManagerInfo:在 ip-10-80-91-13.ec2.internal:39149 的内存中添加了 input-0-1416016782800(大小:256.0 B,免费:894.8 MB) 2015 年 14 月 11 日 01:59:43 INFO storage.BlockManagerMaster:更新了块输入-0-1416016782800 的信息 14/11/15 01:59:43 WARN storage.BlockManager: Block input-0-1416016782800 已经存在于这台机器上;不重新添加 2015 年 14 月 11 日 01:59:43 信息接收器。块生成器:推送块输入-0-1416016782800 14/11/15 01:59:44 INFO scheduler.ReceiverTracker:流 0 收到 2 个块 2015 年 14 月 11 日 01:59:44 INFO scheduler.ReceiverTracker:流 1 收到 0 个块 2015 年 14 月 11 日 01:59:44 INFO scheduler.JobScheduler:为时间 1416016784000 毫秒添加了作业 2015 年 14 月 11 日 01:59:46 INFO scheduler.ReceiverTracker:流 0 收到 0 个块 14/11/15 01:59:46 INFO scheduler.ReceiverTracker:流 1 收到 0 个块 14/11/15 01:59:46 INFO scheduler.JobScheduler:为时间 1416016786000 ms 添加作业 14/11/15 01:59:46 INFO impl.CWPublisherRunnable:成功发布 17 个数据。 14/11/15 01:59:46 INFO storage.MemoryStore: ensureFreeSpace(248) 调用 curMem=4032, maxMem=938244833 14/11/15 01:59:46 INFO storage.MemoryStore: 块 input-1-1416016786000 作为值存储在内存中(估计大小 248.0 B,可用 894.8 MB) 14/11/15 01:59:46 INFO storage.BlockManagerInfo:在 ip-10-80-91-13.ec2.internal:39149 的内存中添加了 input-1-1416016786000(大小:248.0 B,免费:894.8 MB) 14/11/15 01:59:46 INFO storage.BlockManagerMaster:更新了块 input-1-1416016786000 的信息 14/11/15 01:59:46 WARN storage.BlockManager: Block input-1-1416016786000 在这台机器上已经存在;不重新添加 2015 年 14 月 11 日 01:59:46 信息接收器。块生成器:推送块输入 1-1416016786000 14/11/15 01:59:46 INFO impl.CWPublisherRunnable: 成功发布 14 个数据。 2015 年 14 月 11 日 01:59:48 INFO scheduler.ReceiverTracker:流 0 收到 0 个块 14/11/15 01:59:48 INFO storage.MemoryStore: ensureFreeSpace(264) 调用 curMem=4280, maxMem=938244833 14/11/15 01:59:48 INFO scheduler.ReceiverTracker:流 1 收到 1 个块 14/11/15 01:59:48 INFO storage.MemoryStore: 块 input-0-1416016787800 作为值存储在内存中(估计大小 264.0 B,可用 894.8 MB) 2015 年 14 月 11 日 01:59:48 INFO storage.BlockManagerInfo:在 ip-10-80-91-13.ec2.internal:39149 的内存中添加了 input-0-1416016787800(大小:264.0 B,免费:894.8 MB) 2015 年 14 月 11 日 01:59:48 INFO storage.BlockManagerMaster:更新了块输入-0-1416016787800 的信息 14/11/15 01:59:48 INFO scheduler.JobScheduler:为时间 1416016788000 毫秒添加作业 14/11/15 01:59:48 WARN storage.BlockManager: Block input-0-1416016787800 已经存在于这台机器上;不重新添加 2015 年 14 月 11 日 01:59:48 信息接收器。块生成器:推送块输入-0-1416016787800 14/11/15 01:59:50 INFO scheduler.ReceiverTracker:流 0 收到 1 个块 14/11/15 01:59:50 INFO scheduler.ReceiverTracker:流 1 收到 0 个块 2015 年 14 月 11 日 01:59:50 INFO scheduler.JobScheduler:为时间 1416016790000 毫秒添加了作业 14/11/15 01:59:51 INFO storage.MemoryStore: ensureFreeSpace(264) 调用 curMem=4544, maxMem=938244833 14/11/15 01:59:51 INFO storage.MemoryStore: 块 input-0-1416016790800 作为值存储在内存中(估计大小 264.0 B,可用 894.8 MB) 14/11/15 01:59:51 INFO storage.BlockManagerInfo:在 ip-10-80-91-13.ec2.internal:39149 的内存中添加了 input-0-1416016790800(大小:264.0 B,免费:894.8 MB) 14/11/15 01:59:51 INFO storage.BlockManagerMaster:更新了块输入-0-1416016790800 的信息 14/11/15 01:59:51 WARN storage.BlockManager: Block input-0-1416016790800 已经存在于这台机器上;不重新添加 14/11/15 01:59:51 INFO receiver.BlockGenerator: 推送块 input-0-1416016790800

【问题讨论】:

  • 来自 SO 指南:寻求调试帮助的问题(“为什么这段代码不起作用?”)必须包括所需的行为、特定问题或错误以及在问题本身。没有明确问题陈述的问题对其他读者没有用处。另见:stackoverflow.com/help/mcve

标签: java apache-spark amazon-kinesis


【解决方案1】:

这可能与您获得了多少工作线程有关。当我使用 --master local[2] 运行应用程序时,我遇到了同样的问题。我花了很多时间寻找答案,但一无所获。只是出于好奇,我改为 --master local[4] 并且它起作用了。我不知道根本原因。也许更熟悉 Spark 的人可以启发我们。

注意:在我的例子中,我的 Kinesis 流有两个分片。因此,该应用创建了两个输入流,每个分片一个。

【讨论】:

  • 这就是问题所在。我用 1 个分片创建了一个新流,程序开始工作。我研究了更多,发现集群中有 1 个分片至少需要 2 个核心,2 个分片需要 4 个核心,3 个分片至少需要 6 个核心。
【解决方案2】:

感谢@user3594557 的提示。

https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html#input-dstreams有两个大笔记

如果分配给应用程序的核心数小于或 等于输入 DStreams/receiver 的数量,则系统 将接收数据,但无法处理它们。

在本地运行时,如果你的master URL设置为“local”,那么有 只是运行任务的一个核心。这对于具有 甚至一个输入 DStream(文件流都可以),因为接收器会 占据那个核心,就没有核心来处理数据了。

【讨论】:

  • 我仍然无法解决这个问题,我有 5 个分片并且在 master 中提到了 local[20]。它仍然向我显示空白 rdds。还有其他建议吗?相同的字数示例。
  • @RockSolid:你能解决这个问题吗?我也有同样的问题。我正在使用自定义接收器
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-03-21
  • 2023-03-11
  • 2017-04-07
  • 2018-02-18
  • 1970-01-01
  • 2018-03-02
  • 2018-07-10
相关资源
最近更新 更多