【问题标题】:Scala task not serializableScala 任务不可序列化
【发布时间】:2018-09-19 16:02:00
【问题描述】:

我有以下代码:-

case class event(imei: String, date: String, gpsdt: String,  entrygpsdt: String,lastgpsdt: String)

object recalculate extends Serializable {
def main(args: Array[String]) {
  val conf = new SparkConf()
  .setMaster("local[2]")
  .setAppName("RecalculateOdo")
  .set("spark.cassandra.connection.host", "192.168.0.78")
  .set("spark.cassandra.connection.keep_alive_ms", "20000")

 val sc = SparkContext.getOrCreate(conf)

 val rdd = sc.cassandraTable("db", "table").select("imei", "date", "gpsdt").where("imei=? and date=? and gpsdt>? and gpsdt<?", entry(0), entry(1), entry(2), entry(3))
var lastgpsdt = "2018-04-06 10:10:10"
 rdd.foreach(f => 
      {

      val imei = f.get[String]("imei")
      val date = f.get[String]("date")
      val gpsdt = f.get[String]("gpsdt")
      val now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime())
      val collection = sc.parallelize(Seq(event(imei, date, gpsdt,now,lastgpsdt)))
      collection.saveToCassandra("db", "table", SomeColumns("imei", "date", "gpsdt", "entrygpsdt","lastgpsdt")
      lastgpsdt = gpsdt
      })
 }
}

每当我尝试运行代码时,都会收到 Task serializable 错误:-

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)

请提出建议,谢谢,

【问题讨论】:

  • 你在哪里声明了 sparkContext 变量?这个问题似乎是因为“事件”。
  • @ShrinivasDeshmukh - 请重新检查,我已经用 sc 详细信息编辑了问题。
  • 我应该在“事件”案例类中进行哪些更改才能解决它?
  • 如果我跳过事件类并写 val collection = sc.parallelize(Seq(imei, date, gpsdt,now)) ,那么它也会给出同样的错误
  • 我分享的链接解释了如何在所有工作节点上声明和使用对象,请看一下!

标签: scala apache-spark exception serialization spark-cassandra-connector


【解决方案1】:

SparkContext 不可序列化。您应该从驱动程序本身访问它。 而不是rdd.foreach 使用rdd.map 并返回event(imei, date, gpsdt,now)
然后将此结果保存到 Cassandra。比如:

val eventsRdd = rdd.map { f => 
  val imei = f.get[String]("imei")
  val date = f.get[String]("date")
  val gpsdt = f.get[String]("gpsdt")
  val now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime())
  event(imei, date, gpsdt,now)
}
eventsRdd.saveToCassandra("db", "table", SomeColumns("imei", "date", "gpsdt", "entrygpsdt"))

另一方面,如果您有很多事件,我会考虑不创建日期格式化程序并计算每个事件的当前时间。您可以在开始计算之前执行一次(或每个分区至少执行一次 - 请参阅 mapPartitions)。

【讨论】:

  • 嗨,它有效,但实际上如果你再次检查我的问题,我确实有一些字段要在创建事件后保存到我将如何做到这一点,在你的回答中,事件声明应该是最后一个一个,但我想存储要在下一个条目中发送的 lastgpsdt。
  • 您在我发布答案后对其进行了编辑 :) lastgpsdt 实际上代表什么?恐怕如果您执行上述尝试,由于并发问题,您将无法获得预期的结果。但是,如果你想要最大 gpsdt,你可以投影 eventsRdd 给你 gpsdt 并取​​最大值:所以它应该类似于eventsRdd.map(_. gpsdt).max()
  • 那么,让我来解释一下我要做什么。我将遍历 Cassandra 行并将当前行的 gpsdt 保存到下一行,这将表示为名称为“lastgpsdt”的上一行的 gpsdt ..你对我的查询清楚吗?
  • 所以,基本上每一行都会有 2 列——“gpsdt”是该条目的时间,而“lastgpsdt”实际上是前一行的“gpsdt”
  • 所以,我目前的想法是创建rdd并将其保存到cassandra,然后将gpsdt存储到“lastgpsdt”变量中,以便在下一个循环中使用
猜你喜欢
  • 2015-12-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-08-14
  • 1970-01-01
  • 2017-09-21
  • 2020-07-01
  • 2021-08-12
相关资源
最近更新 更多