【发布时间】:2023-04-11 04:44:02
【问题描述】:
在我的程序中,我有一个返回一些 RDD 的方法,我们称它为 myMethod,它接受一个不可序列化的参数,让 RDD 的类型为 Long(我真正的 RDD 是 Tuple 类型,但仅包含原始类型)。
当我尝试这样的事情时:
val x: NonSerializableThing = ...
val l: Long = ...
myMethod(x, l).map(res => res + l) // myMethod's RDD does NOT include the NonSerializableThing
我收到Task not serializable。
当我将 res + l 替换为 res + 1L(即某个常量)时,它会运行。
从序列化跟踪中,它尝试序列化 NonSerializableThing 并在那里阻塞,但我仔细检查了我的方法,这个对象从未出现在 RDD 中。
当我尝试直接收集myMethod 的输出时,即使用
myMethod(x, l).take(1) foreach println
我也没有问题。
该方法使用NonSerializableThing 获取(本地)值序列,在该值上进行多个 Cassandra 查询(这是必需的,因为我需要构造要查询的分区键),如下所示:
def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = {
val someParam1: String = x.someProperty
x.getSomeSeq.flatMap(y: OtherNonSerializableThing => {
val someParam2: String = y.someOtherProperty
y.someOtherSeq.map(someParam3: String =>
sc.cassandraTable("fooKeyspace", "fooTable").
select("foo").
where("bar=? and quux=? and baz=? and l=?", someParam1, someParam2, someParam3, l).
map(_.getLong(0))
}.reduce((a, b) => a.union(b))
}
getSomeSeq 和 someOtherSeq 返回纯非火花 Seqs
我想要实现的是“联合”多个 Cassandra 查询。
这可能是什么问题?
编辑,附录,应 Jem Tucker 的要求:
我在课堂上的内容是这样的:
implicit class MySparkExtension(sc: SparkContext) {
def getThing(/* some parameters */): NonSerializableThing = { ... }
def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = {
val someParam1: String = x.someProperty
x.getSomeSeq.flatMap(y: OtherNonSerializableThing => {
val someParam2: String = y.someOtherProperty
y.someOtherSeq.map(someParam3: String =>
sc.cassandraTable("fooKeyspace", "fooTable").
select("foo").
where("bar=? and quux=? and baz=? and l=?", someParam1, someParam2, someParam3, l).
map(_.getLong(0))
}.reduce((a, b) => a.union(b))
}
}
这是在包对象中声明的。问题出现在这里:
// SparkContext is already declared as sc
import my.pkg.with.extension._
val thing = sc.getThing(/* parameters */)
val l = 42L
val rdd = sc.myMethod(thing, l)
// until now, everything is OK.
// The following still works:
rdd.take(5) foreach println
// The following causes the exception:
rdd.map(x => x >= l).take(5) foreach println
// While the following works:
rdd.map(x => x >= 42L).take(5) foreach println
我在 Spark shell 以及通过spark-submit 提交的算法中测试了这个“实时”输入。
我现在想尝试的(根据我的最后一条评论)如下:
implicit class MySparkExtension(sc: SparkContext) {
def getThing(/* some parameters */): NonSerializableThing = { ... }
def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = {
val param1 = x.someProperty
val partitionKeys =
x.getSomeSeq.flatMap(y => {
val param2 = y.someOtherProperty
y.someOtherSeq.map(param3 => (param1, param2, param3, l)
}
queryTheDatabase(partitionKeys)
}
private def queryTheDatabase(partitionKeys: Seq[(String, String, String, Long)]): RDD[Long] = {
partitionKeys.map(k =>
sc.cassandraTable("fooKeyspace", "fooTable").
select("foo").
where("bar=? and quux=? and baz=? and l=?", k._1, k._2, k._3, k._4).
map(_.getLong(0))
).reduce((a, b) => a.union(b))
}
}
我相信这可以工作,因为 RDD 现在是在 queryTheDatabase 方法中构造的,而 NonSerializableThing 不存在。
另一个选项可能是:NonSerializableThing 确实是可序列化的,但我将SparkContext 作为隐式构造函数参数传入其中。我想如果我让这个暂时的,它会(无用地)被序列化但不会造成任何问题。
【问题讨论】:
-
请发帖
mymethod或者至少是它的签名。 -
我仍然没有看到
def mymethod(...) ...。 2. 你的对象住在哪里,它们的上下文是什么? -
其中有一个错字。现已更正。
-
a.union(b)对RDDs 进行操作,因此 Spark 将整个方法序列化。我建议您以单独的方法执行union。 -
是的,我知道。这就是为什么我之前准备好一切,并且只使用
a.union(b)作为最后一步,但下面的答案似乎朝着正确的方向发展。
标签: scala apache-spark