【问题标题】:Where to define the Object to Broadcast in Spark Java在 Spark Java 中定义广播对象的位置
【发布时间】:2017-05-26 10:36:17
【问题描述】:

我有一个数据库对象,用于插入来自所有 Spark 执行器的数据。当我将此对象定义为static 时,它在这些执行程序中具有null 值。所以我在驱动程序中声明它,广播它然后在每个执行程序中获取它的值。当我运行应用程序时,抛出以下异常:

Exception in thread "main" java.io.NotSerializableException: database.Database

注意事项:

  • 执行器类是可序列化的
  • 广播对象在该类中被定义为瞬态
  • 我删除了瞬态,但它不起作用

【问题讨论】:

  • 数据库对象是什么意思? DTO 还是别的什么?
  • 请查看How to create a Minimal, Complete, and Verifiable example 并相应地改写您的问题。
  • 我创建了一个类来处理连接到数据库和所有数据库交互。
  • 您不能序列化数据库连接。它正在正确地大喊java.io.NotSerializableException
  • 好。这就是为什么我使用广播,在所有执行者之间共享这个对象。鉴于我得到了这个异常,这是否意味着广播变量应该是可序列化的?

标签: java apache-spark


【解决方案1】:

我是这样解释你的问题的:

我想从我的 RDD 中插入来自所有 Spark 执行器的数据。我试图在驱动程序上创建一个数据库连接,并以某种方式将其作为广播传递给执行程序,但 Spark 不断抛出NotSerializableException。我怎样才能实现我的目标?

简短的回答是:

您应该在每个执行程序节点上分别创建一个新连接。
您不应将数据库连接处理程序、文件处理程序等传递给其他进程,尤其是远程机器。

这里的问题是在哪里创建数据库连接,因为有大量的执行器很容易超过数据库的连接池大小。

你实际上可以做的是使用foreachPartition,就像这里:

  // numPartitions == number of simultaneous DB connections you can afford
  yourRdd.repartition(numPartitions)
  .foreachPartition {
    iter =>
      val connection = createConnection()
      while (iter.hasNext) {
        connection.execute("INSERT ...")
      }
      connection.commit()
  }

这里.foreachPartition里面的代码会在每台executor机器上执行,连接对象不会通过网络发送,不会出现序列化异常,数据会被插入。

this 问题的答案中也提到了使用 foreachPartition 的相同推理。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-24
    • 1970-01-01
    • 2015-04-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多