【问题标题】:Spark 1.5.2 DataFramesUDF Avoid Race Condition upon Object Re-UsageSpark 1.5.2 DataFramesUDF 在对象重用时避免竞争条件
【发布时间】:2017-05-28 19:17:07
【问题描述】:

这里的问题是如何重用 UDF 的对象但避免竞争条件?

我在我的 spark 应用程序中使用 UDF,并且由于竞争条件,单元测试似乎不确定。有时通过有时失败...

为了提高效率,我试图通过创建对象并将它们传递给 UDF 来强制重用它们。然而,共享相同 spark 上下文和 JVM 的单独“测试”似乎正在使用这些对象并导致错误。

def reformatDate(input:String,sdfIn:SimpleDateFormat,sdfOut:SimpleDateFormat): String ={
    sdfOut.format(sdfIn.parse(input))
  }

  val datePartitionFormat = new SimpleDateFormat("yyyyMMdd")
  val dTStampFormat = new SimpleDateFormat("yyyy/MM/dd")
  val validDateFormat = new SimpleDateFormat("yyyy-MM-dd")

  val partitionToDateUDF = udf(reformatDate(_:String,datePartitionFormat,validDateFormat))
  val dTStampToDateUDF= udf(reformatDate(_:String,dTStampFormat,validDateFormat))

有时当我运行我的单元测试时,我会收到以下错误:

17/01/13 11:45:45 错误执行程序:阶段 2.0 中任务 0.0 中的异常 (TID 2) java.lang.NumberFormatException: 多个点 sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1890) 在 sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) 在 java.lang.Double.parseDouble(Double.java:538) 在 java.text.DigitList.getDouble(DigitList.java:169) 在 java.text.DecimalFormat.parse(DecimalFormat.java:2056) 在 java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1867) 在 java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1514) 在 java.text.DateFormat.parse(DateFormat.java:364) 在 com.baesystems.ai.engineering.threatanalytics.microbatch.processor.transformers.metric.mDnsPreviousSeenDomainsStartOfDayDF$.reformatDate(mDnsPreviousSeenDomainsStartOfDayDF.scala:22)

我是这样使用函数的:

val df = df2
  .filter(
    datediff(
      to_date(partitionToDateUDF($"dt"))
      ,to_date(dTStampToDate($"d_last_seen"))
    ) < 90
  )

在调试时发现输入“df2”是:

+-----------+--------+-------------------------+--------------------------------+
|d_last_seen|      dt|partitionToDateUDF($"dt")|dTStampToDateUDF($"d_last_seen")|
+-----------+--------+-------------------------+--------------------------------+
| 2016/11/02|20161102|2016-11-02               |2016-11-02                      |
| 2016/11/01|20161102|2016-11-02               |2016-11-01                      |
+-----------+--------+-------------------------+--------------------------------+

我使用 conf.setMaster("local[2]"),可能是 spark 使用线程,因此在本地运行时共享相同的 JVM,但是部署时不会发生这种情况,因为单独的执行程序将有自己的JVM 以及它们自己的对象实例化?

【问题讨论】:

    标签: scala apache-spark dataframe spark-dataframe race-condition


    【解决方案1】:

    SimpleDateFormat 不是线程安全的(参见例如Why is Java's SimpleDateFormat not thread-safe?)。这意味着,如果您在任何 UDF 中使用它(甚至在单个 Spark 作业中),您可能会得到意想不到的结果,因为 spark 将在多个 tasks 中使用您的 UDF,这些任务在不同的 线程 上运行strong> 最终导致多个线程同时访问它。这对于本地模式和实际分布式集群都是如此 - 每个执行程序上的多个线程将使用一个副本。

    要克服这个问题 - 只需使用 线程安全的不同格式化程序,例如乔达的DateTimeFormatter

    【讨论】:

    • 谢谢 Tzach,我想补充一点,这里对整个问题的回答是:您必须在 Spark UDF 中保持线程安全,因为多个任务在每个执行程序的多个线程上运行。 Tzach 为我提供的解决方案是线程安全的。
    猜你喜欢
    • 1970-01-01
    • 2015-01-30
    • 2010-09-25
    • 1970-01-01
    • 1970-01-01
    • 2010-09-25
    • 2019-06-12
    • 1970-01-01
    相关资源
    最近更新 更多