【问题标题】:How to convert RDD to spark dataframe using sparklyr?如何使用 sparklyr 将 RDD 转换为 spark 数据帧?
【发布时间】:2023-02-09 22:50:31
【问题描述】:

我有很多文件,其中包含由 azure IOT 推送到很多文件夹中的 blob 存储上的文本数据,我想读取它们并有一个 delta lake 表,文件的每一行都有一行。我以前是一个文件一个文件地读,但是太费时间了,所以我想用spark来加速这个处理。它需要集成一个用 R 编写的数据块工作流。

我找到了spark_read_text函数来读取文本文件,但它不能递归读取目录,它只能理解是否所有文件都在一个目录中。

下面是一个文件路径的例子(appid/partition/year/month/day/hour/minute/file): app_id/10/2023/02/06/08/42/gdedir22hccjq

Partition是azure IoT似乎为了并行处理数据而创建的随机文件夹(目前大约有30个),因此可以将同一日期的数据拆分在多个文件夹中,这并没有简化读取效率。

所以我发现唯一能做到这一点的函数是spark.textFile,它与小丑一起工作并递归处理目录。唯一的问题是它返回一个 RDD,我找不到将其转换为 spark 数据帧的方法,最终可以使用 tbl_spark R 对象访问它。

这是我到目前为止所做的:

您需要将配置设置为递归读取文件夹(这里我在专用 python 单元格中对数据块执行此操作):

%py
sc._jsc.hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true")

然后我可以创建一个 RDD:

j_rdd <- spark_context(sc) %>%
  invoke("textFile", "/mnt/my_cont/app_id/*/2022/11/17/*", 10L)

这是创建 RDD 的工作,如您所见,我可以用“*”映射所有分区(年份之前),以及在末尾用“*”递归地映射四个小时和分钟的文件夹。

我可以收集它并创建一个 R 数据框:

lst <- invoke(j_rdd, "collect")
data.frame(row = unlist(lst))

这正确地获取了我的数据,每个文件的每一行都有一列文本和一行(出于隐私原因我无法显示示例,但这并不重要)。

问题是我不想收集,但想用这些数据更新一个增量表,但找不到一种方法来获取我可以使用的 sparklyr 对象。我拿到的j_rdd对象是这样的:

>j_obj
<jobj[2666]>
  org.apache.spark.rdd.MapPartitionsRDD
  /mnt/my_cont/app_id/*/2022/11/17/* MapPartitionsRDD[80] at textFile at NativeMethodAccessorImpl.java:0

到目前为止我离得越近:我尝试复制代码here以使用调用将数据转换为数据帧,但我似乎做得不正确:

contents_field <- invoke_static(sc, "sparklyr.SQLUtils", "createStructField", "contents", "character", TRUE)
schema <- invoke_static(sc, "sparklyr.SQLUtils", "createStructType", list(contents_field))
j_df <- invoke(hive_context(sc), "createDataFrame", j_rdd, schema)
invoke(j_df, "createOrReplaceTempView", "tmp_test")
dfs <- tbl(sc, "tmp_test")
dfs %>% sdf_nrow()

我只有一列包含字符,所以我认为它可以工作,但我收到此错误:

Error : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 25.0 failed 4 times, most recent failure: Lost task 14.3 in stage 25.0 (TID 15158) (10.221.193.133 executor 2): java.lang.RuntimeException: Error while encoding: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, contents), StringType, false), true, false, true) AS contents#366
    at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1192)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:236)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:208)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:156)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:95)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:233)
    ... 28 more

有谁知道如何将这个 RDD 对象(使用 R/sparklyr)转换为无需收集数据即可使用的调用函数的返回值?

【问题讨论】:

    标签: r apache-spark databricks rdd sparklyr


    【解决方案1】:

    最后我发现spark_read_text也可以用joker读取多个文件,但是你必须为每个目录和文件放一个joker,它不能递归地发现文件夹。

    例如:

    dfs <- spark_read_text(sc, "/mnt/container/app_id/10/2023/02/06/*")
    

    ...不起作用。但:

    dfs <- spark_read_text(sc, "/mnt/container/app_id/10/2023/02/06/*/*/*")
    

    ...作品。还:

    dfs <- spark_read_text(sc, "/mnt/container/app_id/*/2023/02/06/*/*/*")
    

    ...在日期上方加上小丑也可以。

    由于目录深度在我的情况下没有改变,这对我来说已经足够了。

    【讨论】:

      猜你喜欢
      • 2018-06-25
      • 1970-01-01
      • 2017-08-24
      • 2016-04-21
      • 1970-01-01
      • 1970-01-01
      • 2020-09-06
      • 2019-01-26
      相关资源
      最近更新 更多