【问题标题】:Why does Spark groupBy.agg(min/max) of BigDecimal always return 0?为什么 BigDecimal 的 Spark groupBy.agg(min/max) 总是返回 0?
【发布时间】:2019-02-11 23:42:34
【问题描述】:

我正在尝试按 DataFrame 的一列进行分组,并在每个结果组中生成 BigDecimal 列的 minmax 值。结果总是产生一个非常小的值(大约为 0)。

(类似的 min/max 调用 Double 列会产生预期的非零值。)

举个简单的例子:

如果我创建以下 DataFrame:

import org.apache.spark.sql.{functions => f}

case class Foo(group: String, bd_value: BigDecimal, d_value: Double)

val rdd = spark.sparkContext.parallelize(Seq(
  Foo("A", BigDecimal("1.0"), 1.0),
  Foo("B", BigDecimal("10.0"), 10.0),
  Foo("B", BigDecimal("1.0"), 1.0),
  Foo("C", BigDecimal("10.0"), 10.0),
  Foo("C", BigDecimal("10.0"), 10.0),
  Foo("C", BigDecimal("10.0"), 10.0)
))

val df = rdd.toDF()

在 Double 或 BigDecimal 列中选择 max 会返回预期结果:

df.select(f.max("d_value")).show()

// +------------+
// |max(d_value)|
// +------------+
// |        10.0|
// +------------+

df.select(f.max("bd_value")).show()

// +--------------------+
// |       max(bd_value)|
// +--------------------+
// |10.00000000000000...|
// +--------------------+

但如果我分组然后聚合,我会得到 Double 列的合理结果,但 BigDecimal 列的值接近于零:

df.groupBy("group").agg(f.max("d_value")).show()

// +-----+------------+
// |group|max(d_value)|
// +-----+------------+
// |    B|        10.0|
// |    C|        10.0|
// |    A|         1.0|
// +-----+------------+

df.groupBy("group").agg(f.max("bd_value")).show()

// +-----+-------------+
// |group|max(bd_value)|
// +-----+-------------+
// |    B|     1.00E-16|
// |    C|     1.00E-16|
// |    A|      1.0E-17|
// +-----+-------------+

为什么 spark 对于这些 min/max 调用返回零结果?

【问题讨论】:

  • 尝试一些不同的测试用例有助于优化问题 - f.max 返回的值似乎是正确值乘以 10^-17。例如,更改示例以使C 组中的最大值为123456.0 导致1.234560E-12 作为最大值。因此,我们可以将问题缩小到“为什么在 groupBy 之后指数在 max 中被破坏?”
  • 另一个有趣的花絮:如果我将 RDD 创建为 Rows 的 Seq 并自己创建架构(使用与使用案例创建 DF 的架构中出现的相同 Decimal(38, 18)类),我得到正确的行为。从元组序列中生成 DF 会产生同样的错误行为。所以,现在我们可以问“当我们从具有显式模式的 RDD 的行创建 DataFrame 时会发生什么,这与使用案例类或 Seq 不同,为什么后一种情况会破坏 BigDecimals 中的指数?”

标签: apache-spark apache-spark-sql bigdecimal


【解决方案1】:

TL;DR

Spark 如何处理BigDecimals 的规模似乎不一致,这在问题中显示的特定情况下表现出来。代码的行为就好像它使用BigDecimal 对象的比例将BigDecimals 转换为未缩放的Longs,然后使用模式的比例转换回BigDecimal

这两种方法都可以解决

  • 使用setScale 明确设置所有BigDecimal 值的比例以匹配DataFrame 的架构,或者
  • 手动指定架构并从 RDD[Row] 创建 DF

加长版

这是我认为使用 Spark 2.4.0 在我的机器上发生的事情。

groupBy.max 的情况下,Spark 将通过UnsafeRow 并将BigDecimal 转换为未缩放 Long 并将其作为字节数组存储在setDecimal @987654322 @ 行(已通过打印语句验证)。然后,当它稍后调用getDecimal 时,它使用模式中指定的比例 将字节数组转换回BigDecimal

如果原始值中的比例与架构中的比例不匹配,则会导致值不正确。例如,

val foo = BigDecimal(123456)
foo.scale
0

val bytes = foo.underlying().unscaledValue().toByteArray()

// convert the bytes into BigDecimal using the original scale -- correct value
val sameValue = BigDecimal(new java.math.BigInteger(bytes), 0)
sameValue: scala.math.BigDecimal = 123456

// convert the bytes into BigDecimal using scale 18 -- wrong value
val smaller = BigDecimal(new java.math.BigInteger(bytes), 18)
smaller: scala.math.BigDecimal = 1.23456E-13

如果我只选择bd_value 列的最大值,Spark 似乎不会通过setDecimal。我还没有验证原因,或者它的去向。

但是,这可以解释问题中观察到的值。使用相同的案例类Foo

// This BigDecimal has scale 0
val rdd = spark.sparkContext.parallelize(Seq(Foo("C", BigDecimal(123456), 123456.0)))

// And shows with scale 0 in the DF
rdd.toDF.show
+-----+--------+--------+
|group|bd_value| d_value|
+-----+--------+--------+
|    C|  123456|123456.0|
+-----+--------+--------+

// But the schema has scale 18
rdd.toDF.printSchema
root
 |-- group: string (nullable = true)
 |-- bd_value: decimal(38,18) (nullable = true)
 |-- d_value: double (nullable = false)


// groupBy + max corrupts in the same way as converting to bytes via unscaled, then to BigDecimal with scale 18
rdd.groupBy("group").max("bd_value").show
+-----+-------------+
|group|max(bd_value)|
+-----+-------------+
|    C|  1.23456E-13|
+-----+-------------+

// This BigDecimal is forced to have the same scale as the inferred schema
val rdd = spark.sparkContext.parallelize(Seq(Foo("C",BigDecimal(123456).setScale(18), 123456.0)))

// verified the scale is 18 in the DF
+-----+--------------------+--------+
|group|            bd_value| d_value|
+-----+--------------------+--------+
|    C|123456.0000000000...|123456.0|
+-----+--------------------+--------+


// And it works as expected
rdd1.groupBy("group").max("bd_value").show

+-----+--------------------+
|group|       max(bd_value)|
+-----+--------------------+
|    C|123456.0000000000...|
+-----+--------------------+

这也可以解释为什么,正如评论中所观察到的,当从具有显式架构的 RDD[Row] 转换时,它可以正常工作。

val rdd2 = spark.sparkContext.parallelize(Seq(Row("C", BigDecimal(123456), 123456.0)))

// schema has BigDecimal scale 18
val schema = StructType(Seq(StructField("group", StringType, true), StructField("bd_value", DecimalType(38,18), true), StructField("d_value",DoubleType,false)))

// createDataFrame interprets the value into the schema's scale
val df = spark.createDataFrame(rdd2, schema)

df.show

+-----+--------------------+--------+
|group|            bd_value| d_value|
+-----+--------------------+--------+
|    C|123456.0000000000...|123456.0|
+-----+--------------------+--------+

【讨论】:

    猜你喜欢
    • 2021-01-24
    • 2016-09-16
    • 2020-05-25
    • 2013-02-26
    • 2013-06-25
    • 1970-01-01
    • 1970-01-01
    • 2013-02-14
    相关资源
    最近更新 更多