【问题标题】:Apache Spark Kinesis Integration: connected, but no records receivedApache Spark Kinesis 集成:已连接,但未收到记录
【发布时间】:2015-11-27 17:42:55
【问题描述】:

tldr; 无法使用 Kinesis Spark Streaming 集成,因为它不接收数据。

  1. 测试流设置完毕,nodejs 应用每秒发送 1 条简单记录。
  2. 标准 Spark 1.5.2 集群设置有主节点和工作节点(4 核),环境中带有 docker-compose、AWS 凭证
  3. spark-streaming-kinesis-asl-assembly_2.10-1.5.2.jar 已下载并添加到类路径中
  4. job.pyjob.jar(仅阅读和打印)已提交。
  5. 似乎一切正常,但没有收到任何记录。

KCL Worker 线程有时会说“Sleeping ...”——它可能会被静默地破坏(我检查了所有能找到的 stderr,但没有任何提示)。也许吞下了 OutOfMemoryError ......但我怀疑这一点,因为每秒 1 条记录的数量。

------------------------------------------ 时间:1448645109000 毫秒 ------------------------------------------ 27 年 15 月 11 日 17:25:09 信息 JobScheduler:已完成作业流作业 1448645109000 ms.0 来自作业集的时间 1448645109000 ms 27 年 15 月 11 日 17:25:09 信息 KinesisBackedBlockRDD:从持久性列表中删除 RDD 102 15/11/27 17:25:09 INFO JobScheduler:总延迟:0.002 秒,时间 1448645109000 毫秒(执行:0.001 秒) 27 年 15 月 11 日 17:25:09 信息块管理器:删除 RDD 102 27 年 15 月 11 日 17:25:09 信息 KinesisInputDStream:在 NewClass.java:25 的时间 1448645109000 毫秒的 createStream 处删除 RDD KinesisBackedBlockRDD[102] 块 15/11/27 17:25:09 INFO ReceivedBlockTracker: 删除批次 ArrayBuffer(1448645107000 ms) 15/11/27 17:25:09 INFO InputInfoTracker:删除旧批次元数据:1448645107000 ms 27 年 15 月 11 日 17:25:10 信息 JobScheduler:为时间 1448645110000 毫秒添加了作业 27 年 15 月 11 日 17:25:10 信息 JobScheduler:从时间 1448645110000 毫秒的作业集开始作业流作业 1448645110000 ms.0 ------------------------------------------ 时间:1448645110000 毫秒 ------------------------------------------

请让我知道任何提示,我真的很想使用 Spark 进行实时分析......除了这个不接收数据的小细节 :) 似乎没问题。

PS:我觉得奇怪的是 Spark 忽略了我的存储级别(内存和磁盘 2)和检查点间隔(20,000 毫秒)的设置

27 年 15 月 11 日 17:23:26 信息 KinesisInputDStream:metadataCleanupDelay = -1 27 年 15 月 11 日 17:23:26 信息 KinesisInputDStream:滑动时间 = 1000 毫秒 27 年 15 月 11 日 17:23:26 信息 KinesisInputDStream:存储级别 = StorageLevel(假,假,假,假,1) 27 年 15 月 11 日 17:23:26 信息 KinesisInputDStream:检查点间隔 = null 27 年 15 月 11 日 17:23:26 信息 KinesisInputDStream:记住持续时间 = 1000 毫秒 15/11/27 17:23:26 INFO KinesisInputDStream:初始化和验证 org.apache.spark.streaming.kinesis.KinesisInputDStream@74b21a6

源代码(java):

公共类新类{ 公共静态无效主要(字符串[]参数){ SparkConf conf = new SparkConf().setAppName("appname").setMaster("local[3]"); JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000)); JavaReceiverInputDStream kinesisStream = KinesisUtils.createStream( ssc,“webassist-test”,“测试”,“https://kinesis.us-west-1.amazonaws.com”,“us-west-1”, InitialPositionInStream.LATEST, 新的持续时间(20000), StorageLevel.MEMORY_AND_DISK_2() ); kinesisStream.print(); ssc.start(); ssc.awaitTermination(); } }

Python 代码(之前尝试过 pprinting 和发送到 MongoDB):

从 pyspark.streaming.kinesis 导入 KinesisUtils,InitialPositionInStream 从 pyspark 导入 SparkContext,StorageLevel 从 pyspark.streaming 导入 StreamingContext 从系统导入 argv sc = SparkContext(appName="webassist-test") ssc = StreamingContext(sc, 5) 流 = KinesisUtils.createStream(ssc, "应用名称", “测试”, "https://kinesis.us-west-1.amazonaws.com", “我们-西-1”, InitialPositionInStream.LATEST, 5、 StorageLevel.MEMORY_AND_DISK_2) 流.pprint() ssc.start() ssc.awaitTermination()

注意:我还尝试使用 stream.foreachRDD(lambda rdd: rdd.foreachPartition(send_partition)) 将数据发送到 MongoDB,但没有将其粘贴到此处,因为您需要一个 MongoDB 实例,并且它与问题无关 - 输入中已经没有记录。

还有一件事——KCL 从不提交。对应的 DynamoDB 如下所示:

租用密钥检查点租用计数器租用所有者所有者SwitchesSinceCheckpoint shardId-000000000000 最新的 614 本地主机:d92516 ... 8

用于提交的命令:

spark-submit --executor-memory 1024m --master spark://IpAddress:7077 /path/test.py

在 MasterUI 中我可以看到:

 Input Rate
   Receivers: 1 / 1 active
   Avg: 0.00 events/sec
 KinesisReceiver-0
   Avg: 0.00 events/sec
...
 Completed Batches (last 76 out of 76)

感谢您的帮助!

【问题讨论】:

  • 你能贴一下Stream上下文创建、DStream转换和打印的源代码吗?我们可以看看。
  • 也有同样的问题。
  • 你解决了吗?

标签: apache-spark spark-streaming amazon-kinesis


【解决方案1】:

过去我在连接 Kinesis 时遇到过 Spark Streaming 中没有显示记录活动的问题。

我会尝试这些方法以从 Spark 获得更多反馈/不同的行为:

  1. 确保使用 foreachRDDprint 等输出操作强制评估 DStream 转换操作另存为...

  2. 在创建流时使用“Kinesis 应用程序名称”参数的新名称在 DynamoDB 中创建新的 KCL 应用程序或清除现有应用程序。

  3. 在创建流时在 TRIM_HORIZON 和 LATEST 之间切换初始位置。

  4. 尝试这些更改时重新启动上下文。

添加代码后编辑: 也许我遗漏了一些明显的东西,但我看不出你的源代码有什么问题。您是否有 n+1 个 CPU 运行此应用程序(n 是 Kinesis 分片的数量)?

如果您运行从 docker 实例中的分片读取的 KCL 应用程序(Java/Python/...),它可以工作吗?也许您的网络配置有问题,但我希望有一些错误消息能指出这一点。

如果这足够重要/您有一点时间,您可以在您的 docker 实例中快速实现 kcl 阅读器,并允许您与您的 Spark 应用程序进行比较。一些网址:

Python

Java

Python example

另一种选择是在不同的集群中运行您的 Spark Streaming 应用程序并进行比较。

P.S.:我目前在不同集群中使用带有 Kinesis 的 Spark Streaming 1.5.2,它按预期处理记录/显示活动。

【讨论】:

  • 嗨!感谢您的回答。我已经在 J​​ava 和 Python 中添加了代码,你可以选择:)
  • 广告 1:我使用:foreachRDD,(p)打印广告 2:我尝试了多个流,多次删除(自动创建的)表广告 3:尝试了广告 4:它正在运行docker,一直在重启,一定是个大问题,但我不能指望它...
  • 我在容器中运行了 nodejs aws-sdk kinesis 阅读器,它工作正常。然后我启动了 Spark Streaming,但它没有。如果您有任何其他想法,我可以为您提供什么信息来解决这个谜团,请告诉我;)
  • 另一个想法:尝试运行开箱即用的 vanilla wordcount Kinesis Streaming 示例。它不会计算您要查找的内容,但应该会给您一些输出。如果确实如此,它会为您提供一个良好的起点和问题所在的线索。如果没有,请尝试运行 vanilla 生产者而不是您的 nodejs 生产者。如果仍然没有,请尝试在 ec2 或某些物理机中运行您的 Apache 集群。我知道它们不是好主意,但它们可能会帮助您获得更多反馈并进行横向思考。
  • 有趣的是在同一个 Docker 设置上我能够运行我通常的 Spark 批处理数据管道...也许我会尝试一些其他版本的 Spark...
【解决方案2】:

当我使用建议的文档和示例时,我遇到了这个问题,下面的 scala 代码对我来说很好(你总是可以使用 java 代替)--

val conf = ConfigFactory.load

val config = new SparkConf().setAppName(conf.getString("app.name"))

val ssc = new StreamingContext(config, Seconds(conf.getInt("app.aws.batchDuration")))

val stream = if (conf.hasPath("app.aws.key") && conf.hasPath("app.aws.secret")){
logger.info("Specifying AWS account using credentials.")
    KinesisUtils.createStream(
      ssc,
      conf.getString("app.name"),
      conf.getString("app.aws.stream"),
      conf.getString("app.aws.endpoint"),
      conf.getString("app.aws.region"),
      InitialPositionInStream.LATEST,
      Seconds(conf.getInt("app.aws.batchDuration")),
      StorageLevel.MEMORY_AND_DISK_2,
      conf.getString("app.aws.key"),
      conf.getString("app.aws.secret")
    )
  } else {
    logger.info("Specifying AWS account using EC2 profile.")
    KinesisUtils.createStream(
      ssc,
      conf.getString("app.name"),
      conf.getString("app.aws.stream"),
      conf.getString("app.aws.endpoint"),
      conf.getString("app.aws.region"),
      InitialPositionInStream.LATEST,
      Seconds(conf.getInt("app.aws.batchDuration")),
      StorageLevel.MEMORY_AND_DISK_2
    )
  }

stream.foreachRDD((rdd: RDD[Array[Byte]], time) => {
      val rddstr: RDD[String] = rdd
         .map(arrByte => new String(arrByte))
      rddstr.foreach(x => println(x))
}

【讨论】:

    猜你喜欢
    • 2016-06-13
    • 1970-01-01
    • 2017-05-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-04-15
    • 1970-01-01
    相关资源
    最近更新 更多