【发布时间】:2018-09-19 16:02:00
【问题描述】:
我有以下代码:-
case class event(imei: String, date: String, gpsdt: String, entrygpsdt: String,lastgpsdt: String)
object recalculate extends Serializable {
def main(args: Array[String]) {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("RecalculateOdo")
.set("spark.cassandra.connection.host", "192.168.0.78")
.set("spark.cassandra.connection.keep_alive_ms", "20000")
val sc = SparkContext.getOrCreate(conf)
val rdd = sc.cassandraTable("db", "table").select("imei", "date", "gpsdt").where("imei=? and date=? and gpsdt>? and gpsdt<?", entry(0), entry(1), entry(2), entry(3))
var lastgpsdt = "2018-04-06 10:10:10"
rdd.foreach(f =>
{
val imei = f.get[String]("imei")
val date = f.get[String]("date")
val gpsdt = f.get[String]("gpsdt")
val now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime())
val collection = sc.parallelize(Seq(event(imei, date, gpsdt,now,lastgpsdt)))
collection.saveToCassandra("db", "table", SomeColumns("imei", "date", "gpsdt", "entrygpsdt","lastgpsdt")
lastgpsdt = gpsdt
})
}
}
每当我尝试运行代码时,都会收到 Task serializable 错误:-
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
请提出建议,谢谢,
【问题讨论】:
-
你在哪里声明了 sparkContext 变量?这个问题似乎是因为“事件”。
-
@ShrinivasDeshmukh - 请重新检查,我已经用 sc 详细信息编辑了问题。
-
我应该在“事件”案例类中进行哪些更改才能解决它?
-
如果我跳过事件类并写 val collection = sc.parallelize(Seq(imei, date, gpsdt,now)) ,那么它也会给出同样的错误
-
我分享的链接解释了如何在所有工作节点上声明和使用对象,请看一下!
标签: scala apache-spark exception serialization spark-cassandra-connector