【问题标题】:Apache Spark - accessing internal data on RDDs?Apache Spark - 访问 RDD 上的内部数据?
【发布时间】:2015-09-30 08:35:21
【问题描述】:

我开始做amp-camp 5 exercises。我尝试了以下两种情况:

场景#1

val pagecounts = sc.textFile("data/pagecounts")
pagecounts.checkpoint
pagecounts.count

场景#2

val pagecounts = sc.textFile("data/pagecounts")
pagecounts.count

两者在 Spark shell 应用程序 UI 中显示的总时间不同
场景 #1 耗时 0.5 秒,而场景 #2 仅耗时 0.2 s

在场景 #1 中,检查点命令什么都不做,它既不是 转变也不是行动。据说一旦 RDD 实现 操作后,继续并保存到磁盘。我在这里错过了什么吗?

问题:

  1. 我知道场景 #1 需要更多时间,因为 RDD 是 检查点(写入磁盘)。有没有办法知道所花费的时间 检查点,从总时间开始?
    Spark shell 应用程序 UI 显示以下内容 - 调度程序延迟、任务 反序列化时间、GC时间、Result序列化时间、获取结果 时间。但是,没有显示检查点的故障。

  2. 有没有办法访问上述指标,例如调度器延迟,GC时间 并以编程方式保存它们?我想记录一些上述指标 在 RDD 上调用的每个操作。

  3. 如何以编程方式访问以下信息:

    • RDD 的大小,当在检查点上持久化到磁盘时?
    • 目前内存中有多少百分比的 RDD?
    • 计算 RDD 所花费的总时间?

如果您需要更多信息,请告诉我。

【问题讨论】:

    标签: apache-spark rdd checkpointing


    【解决方案1】:

    Spark REST API 几乎可以满足您的所有要求。

    一些例子;

    目前内存中有多少百分比的 RDD?

    GET /api/v1/applications/[app-id]/storage/rdd/0

    将回复:

    {
      "id" : 0,
      "name" : "ParallelCollectionRDD",
      "numPartitions" : 2,
      "numCachedPartitions" : 2,
      "storageLevel" : "Memory Deserialized 1x Replicated",
      "memoryUsed" : 28000032,
      "diskUsed" : 0,
      "dataDistribution" : [ {
        "address" : "localhost:54984",
        "memoryUsed" : 28000032,
        "memoryRemaining" : 527755733,
        "diskUsed" : 0
      } ],
      "partitions" : [ {
        "blockName" : "rdd_0_0",
        "storageLevel" : "Memory Deserialized 1x Replicated",
        "memoryUsed" : 14000016,
        "diskUsed" : 0,
        "executors" : [ "localhost:54984" ]
      }, {
        "blockName" : "rdd_0_1",
        "storageLevel" : "Memory Deserialized 1x Replicated",
        "memoryUsed" : 14000016,
        "diskUsed" : 0,
        "executors" : [ "localhost:54984" ]
      } ]
    }
    

    计算一个 RDD 所花费的总时间?

    计算 RDD 也称为作业、阶段或尝试。 GET /applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary

    将回复:

    {
      "quantiles" : [ 0.05, 0.25, 0.5, 0.75, 0.95 ],
      "executorDeserializeTime" : [ 2.0, 2.0, 2.0, 2.0, 2.0 ],
      "executorRunTime" : [ 3.0, 3.0, 4.0, 4.0, 4.0 ],
      "resultSize" : [ 1457.0, 1457.0, 1457.0, 1457.0, 1457.0 ],
      "jvmGcTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
      "resultSerializationTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
      "memoryBytesSpilled" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
      "diskBytesSpilled" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
      "shuffleReadMetrics" : {
        "readBytes" : [ 340.0, 340.0, 342.0, 342.0, 342.0 ],
        "readRecords" : [ 10.0, 10.0, 10.0, 10.0, 10.0 ],
        "remoteBlocksFetched" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
        "localBlocksFetched" : [ 2.0, 2.0, 2.0, 2.0, 2.0 ],
        "fetchWaitTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
        "remoteBytesRead" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
        "totalBlocksFetched" : [ 2.0, 2.0, 2.0, 2.0, 2.0 ]
      }
    }
    

    你的问题太笼统了,我就不一一回复了。我相信 spark 必须反映的一切都反映在 REST API 中。

    【讨论】:

    • 我知道我的问题很广泛,感谢您的回答。我可以在应用程序运行时使用 REST api 吗?
    • 是的。实际上,对于自包含应用程序,您只能在应用程序运行时访问 REST API。应用程序终止后,API server 将随之终止。
    • 我现在明白了。在我的应用程序停止后,我试图访问 REST api。如果是这种情况,我如何在我的 spark 应用程序中访问应用程序 ID。我想在我的程序中执行每个操作后记录上述数据。
    • 这已经失控了,`sc.set("spark.app.id", "myId") 可以。您可以在 SO 上轻松找到。
    猜你喜欢
    • 2015-07-08
    • 2015-06-14
    • 2014-05-13
    • 1970-01-01
    • 1970-01-01
    • 2016-04-26
    • 2016-04-03
    • 2017-03-07
    • 1970-01-01
    相关资源
    最近更新 更多