【问题标题】:Why do year and month functions result in long overflow in Spark?为什么年月函数会导致 Spark 中长溢出?
【发布时间】:2021-12-16 22:27:33
【问题描述】:

我正在尝试从 spark 中名为 logtimestamp(TimeStampType 类型)的列创建年和月列。数据源是 cassandra。我正在使用 sparkshell 执行这些步骤,这是我编写的代码 -

import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.types._
var logsDF = spark.read.cassandraFormat("tableName", "cw").load()
var newlogs = logsDF.withColumn("year", year(col("logtimestamp")))
 .withColumn("month", month(col("logtimestamp")))
newlogs.write.cassandraFormat("tableName_v2", "cw")
 .mode("Append").save()

但是这些步骤都不成功,我最终出现以下错误

java.lang.ArithmeticException: long overflow
    at java.lang.Math.multiplyExact(Math.java:892)
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.millisToMicros(DateTimeUtils.scala:205)
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.fromJavaTimestamp(DateTimeUtils.scala:166)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$TimestampConverter$.toCatalystImpl(CatalystTypeConverters.scala:327)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$TimestampConverter$.toCatalystImpl(CatalystTypeConverters.scala:325)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:107)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:252)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:242)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:107)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$.$anonfun$createToCatalystConverter$2(CatalystTypeConverters.scala:426)
    at com.datastax.spark.connector.datasource.UnsafeRowReader.read(UnsafeRowReaderFactory.scala:34)
    at com.datastax.spark.connector.datasource.UnsafeRowReader.read(UnsafeRowReaderFactory.scala:21)
    at com.datastax.spark.connector.datasource.CassandraPartitionReaderBase.$anonfun$getIterator$2(CassandraScanPartitionReaderFactory.scala:110)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:494)
    at com.datastax.spark.connector.datasource.CassandraPartitionReaderBase.next(CassandraScanPartitionReaderFactory.scala:66)
    at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
    at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:413)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我认为这与表中的空值有关,所以我运行了以下内容

scala> logsDF.filter("logtimestamp is null").show()

但这也给出了同样的长溢出错误。

当两者都有 8 个字节的时间戳时,为什么会在 spark 中出现溢出但在 cassandra 中没有溢出? 这可能是什么问题以及如何正确地从时间戳中提取年份和月份?

【问题讨论】:

  • 您能否分享一个来自logtimestamp 列的值示例?
  • @Gabip 是的,这是 logtimestamp 的一些示例(如 spark 所示)- 2021-03-04 10:29:59.311, 2021-03-04 10:29:59.014, 2021 -05-03 21:29:56.699。根据 logsDF.dtypes,它们的类型是 TimestampType
  • @mazaneicha 哦,这可能是问题吗?但我将 spark 3.1.2 与 scala 2.12.10 一起使用。该链接没有提到任何关于 3.1.2
  • 它提到该问题已在 3.1.3 中修复

标签: scala apache-spark cassandra spark-cassandra-connector


【解决方案1】:

原来 cassandra 表中的一个时间戳值大于 spark 允许的最大值,但不足以在 cassandra 中溢出。时间戳已被手动编辑以绕过默认情况下在 cassandra 中完成的更新插入,但这导致在开发过程中形成了一些较大的值。 运行一个 python 脚本来找出这个问题。

【讨论】:

  • 不错的发现!您可以添加导致此问题的示例值吗?
  • @mazaneicha 不幸的是我删除了这些行。这只是一个非常大的数字。虽然 Cassandra 以 yyyy-MM-hh... 格式显示了其他正确的时间戳,但它按原样显示了数字。
  • Aaahhh 读取模式的精妙之处?
  • @mazaneicha 是的,我认为就是这样。尝试了很多来为 cassandra 连接器禁用它并在调试时使用 BigInt 作为时间戳,但没有成功。
猜你喜欢
  • 2011-12-07
  • 1970-01-01
  • 2018-06-21
  • 1970-01-01
  • 2020-09-11
  • 1970-01-01
  • 2014-12-20
  • 2018-10-11
  • 1970-01-01
相关资源
最近更新 更多