【问题标题】:Column type inferred as binary with typed UDAF使用类型化 UDAF 推断为二进制的列类型
【发布时间】:2019-07-07 09:07:20
【问题描述】:

我正在尝试实现一个返回复杂类型的类型化 UDAF。不知何故,Spark 无法推断结果列的类型,并使其binary 将序列化数据放在那里。这是重现问题的最小示例

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{SparkSession, Encoder, Encoders}

case class Data(key: Int)

class NoopAgg[I] extends Aggregator[I, Map[String, Int], Map[String, Int]] {
    override def zero: Map[String, Int] = Map.empty[String, Int]

    override def reduce(b: Map[String, Int], a: I): Map[String, Int] = b

    override def merge(b1: Map[String, Int], b2: Map[String, Int]): Map[String, Int] = b1

    override def finish(reduction: Map[String, Int]): Map[String, Int] = reduction

    override def bufferEncoder: Encoder[Map[String, Int]] = Encoders.kryo[Map[String, Int]]

    override def outputEncoder: Encoder[Map[String, Int]] = Encoders.kryo[Map[String, Int]]
}

object Question {
  def main(args: Array[String]): Unit = {
      val spark = SparkSession.builder().master("local").getOrCreate()

      val sc = spark.sparkContext

      import spark.implicits._

      val ds = sc.parallelize((1 to 10).map(i => Data(i))).toDS()

      val noop = new NoopAgg[Data]().toColumn

      val result = ds.groupByKey(_.key).agg(noop.as("my_sum").as[Map[String, Int]])

      result.printSchema()
  }
}

打印出来

root
 |-- value: integer (nullable = false)
 |-- my_sum: binary (nullable = true)

【问题讨论】:

    标签: scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders


    【解决方案1】:

    这里根本没有推论。相反,您或多或少地得到了您所要求的。具体错误在这里:

    override def outputEncoder: Encoder[Map[String, Int]] = Encoders.kryo[Map[String, Int]]
    

    Encoders.kryo 表示您应用通用序列化并返回二进制 blob。误导部分是.as[Map[String, Int]] - 与人们预期的相反,它不是静态类型检查的。更糟糕的是,它甚至没有被查询规划器主动验证,并且只有在评估 result 时才会抛出运行时异常。

    result.first
    
    org.apache.spark.sql.AnalysisException: cannot resolve '`my_sum`' due to data type mismatch: cannot cast binary to map<string,int>;
      at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
      at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:115)
    ...
    

    您应该提供具体的Encoder,而不是either explicitly

    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder  
    
    def outputEncoder: Encoder[Map[String, Int]] = ExpressionEncoder()
    

    或隐含

    class NoopAgg[I](implicit val enc: Encoder[Map[String, Int]]) extends Aggregator[I, Map[String, Int], Map[String, Int]] {
      ...
      override def outputEncoder: Encoder[Map[String, Int]] = enc
    }
    

    作为副作用,它会使as[Map[String, Int]] 过时,因为Aggregator 的返回类型是已知的。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-17
      • 2023-03-21
      • 2022-11-07
      • 2016-11-07
      • 1970-01-01
      相关资源
      最近更新 更多