【问题标题】:Error when broadcasting Joda DateTime in Spark在 Spark 中广播 Joda DateTime 时出错
【发布时间】:2016-06-13 18:46:54
【问题描述】:

在 Spark 中使用 Joda Time 时,下面的代码会导致 java.lang.NullPointerException

val todayBroadcast = sc.broadcast(new DateTime())
val dataRDD2 = dataRDD.filter(item => {
                                todayBroadcast.value.minusMonths(1).isBefore(item._1)
                              })

另一方面,下面的代码没有问题

    val dataRDD2 = dataRDD.filter(item => {
                                    val today = new DateTime()
                                    today.minusMonths(1).isBefore(item._1)
                                  })

【问题讨论】:

    标签: scala apache-spark jodatime


    【解决方案1】:

    据我所知,Joda 对 Apache Spark 提供的默认序列化存在一些问题。尤其是 Kryo 序列化程序的问题。

    你可以去this SO线程看看。

    无论如何,尝试禁用 Kryo 序列化并使用标准 Java 序列化器org.apache.spark.serializer.JavaSerializer。您可以在 Spark 安装的 spark-defaults.conf 中找到属性 spark.serializer

    现在,您应该拥有以下属性:

    spark.serializer=org.apache.spark.serializer.KryoSerializer
    

    你必须改成

    spark.serializer=org.apache.spark.serializer.JavaSerializer
    

    然后,重新启动 Spark 安装。如果您使用的是某些特定发行版(即 Cloudera),请使用他们提供的管理控制台更改上述属性。

    如果您不能使用标准序列化程序,您可以将DateTime 转换为其他序列化友好的格式,例如StringLong(以毫秒为单位的时间)

    让我们知道。

    【讨论】:

    • 您好,问题是标准序列化器太慢了。最坏的情况,我会在过滤器函数中得到日期。
    • 好吧,我可以广播一个字符串,然后将其转换回日期
    • 好的,但是您确认我所说的了吗?您可以以毫秒(Long)为单位广播时间以解决问题
    【解决方案2】:

    如果您想继续使用带有 joda 日期时间的 Kryo 序列化,您可以改为这样做。这使用了来自 https://github.com/magro/kryo-serializers 的已创建序列化程序

    创建一个扩展 KryoRegistrator 的类

    import com.esotericsoftware.kryo.Kryo
    import org.apache.spark.serializer.KryoRegistrator
    
    class MyRegistrator extends KryoRegistrator {
    
      import de.javakaffee.kryoserializers.jodatime.{JodaDateTimeSerializer, JodaLocalDateSerializer, JodaLocalDateTimeSerializer}
      import org.joda.time.{DateTime, LocalDate, LocalDateTime}
    
      override def registerClasses(kryo: Kryo) {
        kryo.register(classOf[DateTime], new JodaDateTimeSerializer())
        kryo.register(classOf[LocalDate], new JodaLocalDateSerializer())
        kryo.register(classOf[LocalDateTime], new JodaLocalDateTimeSerializer())
      }
    }
    

    然后使用 sparkconf 注册该类

    set("spark.kryo.registrator", "MyRegistrator")
    

    这将正确序列化 joda 日期时间、本地日期

    查看https://spark.apache.org/docs/0.6.1/tuning.html 的文档 -> 数据序列化

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-07-05
      • 2018-10-27
      • 2011-09-17
      • 1970-01-01
      • 1970-01-01
      • 2018-11-12
      相关资源
      最近更新 更多