【问题标题】:Sparklyr's spark_apply function seems to run on single executor and fails on moderately-large datasetSparklyr 的 spark_apply 函数似乎在单个 executor 上运行,在中等大小的数据集上失败
【发布时间】:2018-03-05 22:09:21
【问题描述】:

我正在尝试使用spark_apply 在 Spark 表上运行下面的 R 函数。如果我的输入表很小(例如 5,000 行),这可以正常工作,但是当表中等大(例如 5,000,000 行)时,大约 30 分钟后会引发错误: sparklyr worker rscript failure, check worker logs for details

查看 Spark UI 可以发现,只创建了一个任务,并且将一个执行器应用于此任务。

谁能就为什么这个函数在 500 万行数据集上失败? 问题可能是让一个执行器来完成所有工作,但失败了?

# Create data and copy to Spark
testdf <- data.frame(string_id=rep(letters[1:5], times=1000), # 5000 row table
                 string_categories=rep(c("", "1", "2 3", "4 5 6", "7"), times=1000))
testtbl <- sdf_copy_to(sc, testdf, overwrite=TRUE, repartition=100L, memory=TRUE)

# Write function to return dataframe with strings split out
myFunction <- function(inputdf){
  inputdf$string_categories <- as.character(inputdf$string_categories)
  inputdf$string_categories=with(inputdf, ifelse(string_categories=="", "blank", string_categories))
  stringCategoriesList <- strsplit(inputdf$string_categories, ' ')
  outDF <- data.frame(string_id=rep(inputdf$string_id, times=unlist(lapply(stringCategoriesList, length))),
                  string_categories=unlist(stringCategoriesList))
 return(outDF)
}

# Use spark_apply to run function in Spark
outtbl <- testtbl %>%
  spark_apply(myFunction,
          names=c('string_id', 'string_categories'))
outtbl

【问题讨论】:

    标签: r apache-spark sparklyr


    【解决方案1】:
    1. sparklyr worker rscript failure, check worker logs for details错误是驱动节点写的,指出实际的错误需要在worker日志中找到。通常,可以通过在 Spark UI 的 executor 选项卡中打开 stdout 来访问 worker 日志;日志应包含 RScript: 条目,描述执行程序正在处理的内容以及错误的具体情况。

    2. 对于正在创建的单个任务,当columns 没有指定spark_apply() 中的类型时,它需要计算结果的子集来猜测列类型,为避免这种情况,请按如下方式传递显式列类型:

      outtbl <- testtbl %>% spark_apply( myFunction, columns=list( string_id = "character", string_categories = "character"))

    3. 1234563
    4. 在高负载下,内存不足是很常见的。增加分区数量可以解决此问题,因为它会减少处理此数据集所需的总内存。尝试将其运行为:

      testtbl %>% sdf_repartition(1000) %>% spark_apply(myFunction, names=c('string_id', 'string_categories'))

    5. 也可能是由于函数中的逻辑,函数对某些分区抛出异常,您可以通过使用tryCatch()忽略错误然后查找是不是这种情况缺失值以及为什么函数会因这些值而失败。我会从以下内容开始:

      myFunction <- function(inputdf){ tryCatch({ inputdf$string_categories <- as.character(inputdf$string_categories) inputdf$string_categories=with(inputdf, ifelse(string_categories=="", "blank", string_categories)) stringCategoriesList <- strsplit(inputdf$string_categories, ' ') outDF <- data.frame(string_id=rep(inputdf$string_id, times=unlist(lapply(stringCategoriesList, length))), string_categories=unlist(stringCategoriesList)) return(outDF) }, error = function(e) { return( data.frame(string_id = c(0), string_categories = c("error")) ) }) }

    【讨论】:

    • 感谢您提供如此全面的回答!增加分区的数量解决了这个问题,但也有很多额外的信息可以帮助我继续前进。
    • 也提高了这个不错的答案
    猜你喜欢
    • 2018-11-17
    • 1970-01-01
    • 2021-07-23
    • 1970-01-01
    • 1970-01-01
    • 2018-11-19
    • 2018-03-30
    • 2019-03-23
    • 2020-03-05
    相关资源
    最近更新 更多