【问题标题】:Add scoped variable per row iteration in Apache Spark在 Apache Spark 中为每行迭代添加范围变量
【发布时间】:2017-07-21 09:03:14
【问题描述】:

我正在将多个 html 文件读入 Spark 中的数据框。 我正在使用自定义 udf 将 html 的元素转换为数据框中的列

val dataset = spark
  .sparkContext
  .wholeTextFiles(inputPath)
  .toDF("filepath", "filecontent")
  .withColumn("biz_name", parseDocValue(".biz-page-title")('filecontent))
  .withColumn("biz_website", parseDocValue(".biz-website a")('filecontent))

  ...

  def parseDocValue(cssSelectorQuery: String) = 
     udf((html: String) => Jsoup.parse(html).select(cssSelectorQuery).text())

效果很好,但是每个withColumn 调用都会导致对html字符串的解析,这是多余的。

有没有办法(不使用查找表等)我可以根据每行的“文件内容”列生成 1 个已解析的文档 (Jsoup.parse(html)),并使其可用于数据框中的所有 withColumn 调用?

或者我不应该尝试使用 DataFrames 而只使用 RDD 吗?

【问题讨论】:

  • 你能用示例文本字符串更新吗?
  • 我在本质上遇到了“wholeTextFiles”的非并行化问题(例如,在我什至可以重新分区之前,64 个核心集群上的 2 个执行程序),所以我可能会重写整个事情。当我解决这个问题时,我会更新并查看建议。很抱歉给您带来不便
  • 你解决了吗?
  • 不,我发现我有一个更大的问题,我必须先解决这个问题。不过,不知道解决方案是否会让我回到这一点。

标签: scala apache-spark spark-dataframe user-defined-functions rdd


【解决方案1】:

所以最后的答案其实很简单:

只需映射行并在那里创建对象

def docValue(cssSelectorQuery: String, attr: Option[String] = None)(implicit document: Document): Option[String] = {
    val domObject = document.select(cssSelectorQuery)

    val domValue = attr match {
      case Some(a) => domObject.attr(a)
      case None => domObject.text()
    }

    domValue match {
      case x if x == null || x.isEmpty => None
      case y => Some(y)
    }
  }

 val dataset = spark
      .sparkContext
      .wholeTextFiles(inputPath, minPartitions = 265) 
      .map {
        case (filepath, filecontent) => {
          implicit val document = Jsoup.parse(filecontent)

          val customDataJson = docJson(filecontent, customJsonRegex)


          DataEntry(
            biz_name = docValue(".biz-page-title"),
            biz_website = docValue(".biz-website a"),
            url = docValue("meta[property=og:url]", attr = Some("content")),
            ...
            filename = Some(fileName(filepath)),
            fileTimestamp = Some(fileTimestamp(filepath))
          )
        }
      }
      .toDS()

【讨论】:

    【解决方案2】:

    我可能会重写如下,一次性完成解析和选择并将它们放在一个临时列中:

    val dataset = spark
      .sparkContext
      .wholeTextFiles(inputPath)
      .withColumn("temp", parseDocValue(Array(".biz-page-title", ".biz-website a"))('filecontent))
      .withColumn("biz_name", col("temp")(0))
      .withColumn("biz_website", col("temp")(1))
      .drop("temp")
    
    def parseDocValue(cssSelectorQueries: Array[String]) =
    udf((html: String) => {
      val j = Jsoup.parse(html)
      cssSelectorQueries.map(query => j.select(query).text())})
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-05-23
      • 2016-03-20
      • 2018-05-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多