【问题标题】:Scala spark - count null value in dataframe columns using accumulatorScala spark - 使用累加器计算数据框列中的空值
【发布时间】:2020-03-20 10:49:06
【问题描述】:

我有这个功能:

def countNullValueColumn(df: DataFrame): Array[(String, Long)] = 
   df.columns
      .map(x => (x, df.filter(df(x).isNull || df(x) === "" || df(x).isNan).count))

我正在尝试使用 val counter = sc.longAccumulator 代替数据帧计数函数,但没有成功。

我所做的尝试是:

df.columns.foreach(x => {df.filter(df(x).isNull || df(x) === "" || df(x).isNaN) {counter.add(1)} (x, counter.value)})
df.columns.foreach(x => {df.filter(df(x).isNull || df(x) === "" || df(x).isNaN) {counter.add(1); (x, counter.value)} })

不幸的是,这些都不起作用,因为它没有返回正确的类型 (Array[(String, Long)])。

有人有什么想法或建议吗? 提前致谢

附:我不知道使用累加器是否比计数更有效,但我想尝试一下。

编辑:我应该使用foreach 而不是map 来避免累加器中的值错误吗?因为map 是一个转换,而foreach 是一个动作

Edit2:根据@DNA 的建议,我将代码中的map 更改为foreach

Edit3:好的,现在问题变成了尝试创建Array[(String, Long)]。我试过这个,但:+ 运算符不起作用。

val counter = session.sparkContext.longAccumulator
val res: Array[(String, Long)] = Array()
df.columns
    .foreach(x => res :+ (x, df.filter{ df(x).isNull || df(x) === "" || df(x).isNaN {counter.add(1); counter.value}}))

有人有什么想法或建议吗?

【问题讨论】:

    标签: scala apache-spark-sql


    【解决方案1】:

    documentation 讨论这个话题:

    累加器不会改变 Spark 的惰性求值模型。如果他们 正在对 RDD 的操作中进行更新,它们的值仅 一旦该 RDD 被计算为操作的一部分,就会更新。最后, 累加器更新不保证在内进行时执行 像 map() 这样的惰性转换。下面的代码片段演示了这个属性:

    val accum = sc.longAccumulator
    data.map { x => accum.add(x); x }
    // Here, accum is still 0 because no actions have caused the map operation to be computed.
    

    从累加器获得可靠结果还有一个问题:

    对于仅在操作内部执行的累加器更新,Spark 保证每个任务对累加器的更新只会 应用一次,即重新启动的任务不会更新该值。在 转换,用户应该知道每个任务的更新可能 如果重新执行任务或作业阶段,则应用不止一次。

    因此,出于这两个原因,如果使用这样的累加器,人们应该更喜欢 foreach 等操作而不是 map 等转换。

    另外,请注意您在列数组上运行 foreach,而不是在 DataFrame 本身上运行 - 然后您在 DataFrame 上重复运行 filter 转换。所以在这种情况下,foreach 根本不是 Spark 动作,它只是 Array 上的一个方法。

    因此,您可能需要在 df.columns 数组上使用 map(以便从函数中返回数组),然后在实际 DataFrame 上使用 foreach 操作(以执行计数)。

    这是一种方法:

    df.columns.map(col => {
      val acc = sc.accumulator(0)
      df.foreach(row => {
        val v = row.getAs[Any](col)
        if (v == null || v == "") acc += 1  // NaN left as an exercise
        }
      )
      (col, acc.value)
    })
    

    但请注意,这总是效率低下,因为我们必须对每一列的 DataFrame 进行传递。一次通过计算所有列(为每一行生成一个元组或计数映射)可能会更有效,然后使用reducefold 或类似方法合并计数,而不是使用计数器。

    【讨论】:

    • 是的,事实上我想过使用foreach 而不是map。但我无法返回正确的类型。你知道怎么做吗?
    • foreach所需的函数不需要返回任何东西;签名是def foreach(func: ForeachFunction[T]): Unit,所以它比map()简单
    • 对不起,我的意思是def foreach(f: (T) ⇒ Unit): Unit
    • 对!只是我需要countNullValueColumn 返回Array[(String,Long)] 其中Longcounter.value 的类型
    • 试试看第四次编辑是不是你之前评论的意思,请@DNA
    猜你喜欢
    • 1970-01-01
    • 2017-11-03
    • 2020-06-13
    • 1970-01-01
    • 2022-08-03
    • 2016-05-22
    • 1970-01-01
    • 1970-01-01
    • 2022-11-14
    相关资源
    最近更新 更多