【问题标题】:Strange "Task not serializable" with SparkSpark 出现奇怪的“任务不可序列化”
【发布时间】:2023-04-11 04:44:02
【问题描述】:

在我的程序中,我有一个返回一些 RDD 的方法,我们称它为 myMethod,它接受一个不可序列化的参数,让 RDD 的类型为 Long(我真正的 RDD 是 Tuple 类型,但仅包含原始类型)。

当我尝试这样的事情时:

val x: NonSerializableThing = ...
val l: Long = ...
myMethod(x, l).map(res => res + l) // myMethod's RDD does NOT include the NonSerializableThing

我收到Task not serializable

当我将 res + l 替换为 res + 1L(即某个常量)时,它会运行。

从序列化跟踪中,它尝试序列化 NonSerializableThing 并在那里阻塞,但我仔细检查了我的方法,这个对象从未出现在 RDD 中。

当我尝试直接收集myMethod 的输出时,即使用

myMethod(x, l).take(1) foreach println

我也没有问题。

该方法使用NonSerializableThing 获取(本地)值序列,在该值上进行多个 Cassandra 查询(这是必需的,因为我需要构造要查询的分区键),如下所示:

def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = {
  val someParam1: String = x.someProperty
  x.getSomeSeq.flatMap(y: OtherNonSerializableThing => {
    val someParam2: String = y.someOtherProperty
    y.someOtherSeq.map(someParam3: String =>
      sc.cassandraTable("fooKeyspace", "fooTable").
      select("foo").
      where("bar=? and quux=? and baz=? and l=?", someParam1, someParam2, someParam3, l).
      map(_.getLong(0))
  }.reduce((a, b) => a.union(b))
}

getSomeSeqsomeOtherSeq 返回纯非火花 Seqs

我想要实现的是“联合”多个 Cassandra 查询。

这可能是什么问题?

编辑,附录,应 Jem Tucker 的要求:

我在课堂上的内容是这样的:

implicit class MySparkExtension(sc: SparkContext) {

  def getThing(/* some parameters */): NonSerializableThing = { ... }

  def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = {
    val someParam1: String = x.someProperty
    x.getSomeSeq.flatMap(y: OtherNonSerializableThing => {
      val someParam2: String = y.someOtherProperty
      y.someOtherSeq.map(someParam3: String =>
        sc.cassandraTable("fooKeyspace", "fooTable").
        select("foo").
        where("bar=? and quux=? and baz=? and l=?", someParam1, someParam2, someParam3, l).
        map(_.getLong(0))
    }.reduce((a, b) => a.union(b))
  }
}

这是在包对象中声明的。问题出现在这里:

// SparkContext is already declared as sc
import my.pkg.with.extension._

val thing = sc.getThing(/* parameters */)
val l = 42L
val rdd = sc.myMethod(thing, l)
// until now, everything is OK.
// The following still works:
rdd.take(5) foreach println
// The following causes the exception:
rdd.map(x => x >= l).take(5) foreach println
// While the following works:
rdd.map(x => x >= 42L).take(5) foreach println

我在 Spark shell 以及通过spark-submit 提交的算法中测试了这个“实时”输入。

我现在想尝试的(根据我的最后一条评论)如下:

implicit class MySparkExtension(sc: SparkContext) {

  def getThing(/* some parameters */): NonSerializableThing = { ... }

  def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = {
    val param1 = x.someProperty
    val partitionKeys =
      x.getSomeSeq.flatMap(y => {
        val param2 = y.someOtherProperty
        y.someOtherSeq.map(param3 => (param1, param2, param3, l)
      }
    queryTheDatabase(partitionKeys)
  }

  private def queryTheDatabase(partitionKeys: Seq[(String, String, String, Long)]): RDD[Long] = {
    partitionKeys.map(k =>
      sc.cassandraTable("fooKeyspace", "fooTable").
         select("foo").
         where("bar=? and quux=? and baz=? and l=?", k._1, k._2, k._3, k._4).
         map(_.getLong(0))
    ).reduce((a, b) => a.union(b))
  }
}

我相信这可以工作,因为 RDD 现在是在 queryTheDatabase 方法中构造的,而 NonSerializableThing 不存在。

另一个选项可能是:NonSerializableThing 确实是可序列化的,但我将SparkContext 作为隐式构造函数参数传入其中。我想如果我让这个暂时的,它会(无用地)被序列化但不会造成任何问题。

【问题讨论】:

  • 请发帖mymethod 或者至少是它的签名。
  • 我仍然没有看到def mymethod(...) ... 。 2. 你的对象住在哪里,它们的上下文是什么?
  • 其中有一个错字。现已更正。
  • a.union(b)RDDs 进行操作,因此 Spark 将整个方法序列化。我建议您以单独的方法执行union
  • 是的,我知道。这就是为什么我之前准备好一切,并且只使用a.union(b)作为最后一步,但下面的答案似乎朝着正确的方向发展。

标签: scala apache-spark


【解决方案1】:

当您将l 替换为1L 时,Spark 不再尝试使用方法/变量来序列化类,因此不会引发错误。

您应该能够通过将val x: NonSerializableThing = ... 标记为瞬态来修复,例如

@transient
val x: NonSerializableThing = ...

这意味着当类被序列化时,这个变量应该被忽略。

【讨论】:

  • 通过参数传入x: NonSerialiyableThing是否也可以声明为瞬态?或者如果我从那时起使用@transient val x1: NonSerializableThing = x 并使用x1 就足够了吗?
  • 如果您的意思是在类构造函数中作为参数传入,那么可以。您可以发布包含此代码的完整课程定义吗?
  • 很遗憾我不能,因为我不允许发布公司代码。明天我将尝试以下操作:构造包含所有分区键的Seq(其中只有字符串、长整数等),并将这个(并且只有这个)传递给在集群上执行 Cassandra 查询的私有方法。我认为这可能是一个可行的解决方法,因为在构建 RDD 时,范围内没有 NonSerializableThing
  • 我稍微扩展了我的第一篇文章以显示类的结构以及发生这种情况的一些示例代码。
  • 啊,我明白了,我以为NonSerializableThing 是班级成员。我测试了瞬态参数,它们似乎有效,所以我会试一试。只需将方法更改为def mymethod(@transient n: Non..., l: Long)
猜你喜欢
  • 2018-04-06
  • 2021-03-16
  • 1970-01-01
  • 2015-12-16
  • 2017-03-21
  • 2016-12-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多