【问题标题】:What is the difference between using dataframe and rdd in Spark 1.5.2?在 Spark 1.5.2 中使用 dataframe 和 rdd 有什么区别?
【发布时间】:2018-12-25 19:59:55
【问题描述】:

我从 MongoDB 读取数据,然后映射到 InteractionItem。

 val df = filterByParams(startTs, endTs, widgetIds, documents)
    .filter(item => {
      item._2.get("url") != "localhost" && !EXCLUDED_TRIGGERS.contains(item._2.get("trigger"))
    })
    .flatMap(item => {
      var res = Array[InteractionItem]()

      try {
        val widgetId = item._2.get("widgetId").toString
        val timestamp = java.lang.Long.parseLong(item._2.get("time").toString)
        val extra = item._2.get("extra").toString
        val extras = parseExtra(extra)
        val c = parseUserAgent(extras.userAgent.getOrElse(""))
        val os = c.os.family
        val osVersion = c.os.major
        val device = c.device.family
        val browser = c.userAgent.family
        val browserVersion = c.userAgent.major
        val adUnit = extras.adunit.get
        val gUid = extras.guid.get
        val trigger = item._2.get("trigger").toString
        val objectName = item._2.get("object").toString
        val response = item._2.get("response").toString
        val ts: Long = timestamp - timestamp % 3600


        //
        val interaction = interactionConfiguration.filter(interaction =>
          interaction.get("trigger") == trigger &&
            interaction.get("object") == objectName &&
            interaction.get("response") == response).head
        val clickThrough = interaction.get("clickThrough").asInstanceOf[Boolean]
        val interactionId = interaction.get("_id").toString

        adUnitPublishers.filter(x => x._2._2.toString == widgetId && x._1.toString == adUnit).foreach(publisher => {
          res = res :+ InteractionItem(widgetId, ts, adUnit, publisher._2._1.toString, os, osVersion, device, browser, browserVersion,
            interactionId, clickThrough, 1L, gUid)
        })
        bdPublishers.filter(x => x._1.toString == widgetId).foreach(publisher => {
          res = res :+ InteractionItem(widgetId, ts, adUnit, publisher._2.toString, os, osVersion, device, browser, browserVersion,
            interactionId, clickThrough, 1L, gUid)
        })
      }
      catch {
        case e: Exception => {
          log.info(e.getMessage)
          res = res :+ InteractionItem.invalid()
        }
      }
      res

    }).filter(i => i.interactionCount > 0)

使用RDD方式我再次映射并reduceByKey

.map(i => ((i.widgetId, i.date, i.section, i.publisher, i.os, i.device, i.browser, i.clickThrough, i.id), i.interactionCount))
        .reduceByKey((a, b) => a + b)

我使用 DataFrame 方式进行转换

.toDF()

            df.registerTempTable("interactions")
            df.cache()
            val v = sqlContext.sql("SELECT id, clickThrough, widgetId, date, section, publisher, os, device, browser, interactionCount" +
              " FROM interactions GROUP BY id, clickThrough, widgetId, date, section, publisher, os, device, browser, interactionCount")

从我在 Spark UI 中看到的 使用 Dataframe 需要 210 个阶段?

对于 RDD,它只有 20 个阶段:

我在这里做错了什么?

【问题讨论】:

    标签: mongodb apache-spark apache-spark-sql


    【解决方案1】:

    您在 RDD 和 DF 上应用的操作不同。
    DF 处理时间较长的原因是由于以下额外任务:

    1. registerTempTable()
    2. 缓存()

    虽然 RDD 仅减少单个 given 表达式,但 DF 将整个数据作为一个表处理,并准备缓存,这会消耗额外的 CPU 和存储资源。

    【讨论】:

    • 我省略了操作 registerTempTable 和缓存。并使用类似 toDf().groupBy($columm1, $column2 ....) 但它仍然是 210 个阶段,花费时间是一样的。
    猜你喜欢
    • 1970-01-01
    • 2019-12-25
    • 2021-11-19
    • 2016-05-27
    • 2015-12-05
    • 2017-10-05
    • 1970-01-01
    • 2019-11-05
    • 2019-07-24
    相关资源
    最近更新 更多