【发布时间】:2023-02-09 22:50:31
【问题描述】:
我有很多文件,其中包含由 azure IOT 推送到很多文件夹中的 blob 存储上的文本数据,我想读取它们并有一个 delta lake 表,文件的每一行都有一行。我以前是一个文件一个文件地读,但是太费时间了,所以我想用spark来加速这个处理。它需要集成一个用 R 编写的数据块工作流。
我找到了spark_read_text函数来读取文本文件,但它不能递归读取目录,它只能理解是否所有文件都在一个目录中。
下面是一个文件路径的例子(appid/partition/year/month/day/hour/minute/file): app_id/10/2023/02/06/08/42/gdedir22hccjq
Partition是azure IoT似乎为了并行处理数据而创建的随机文件夹(目前大约有30个),因此可以将同一日期的数据拆分在多个文件夹中,这并没有简化读取效率。
所以我发现唯一能做到这一点的函数是spark.textFile,它与小丑一起工作并递归处理目录。唯一的问题是它返回一个 RDD,我找不到将其转换为 spark 数据帧的方法,最终可以使用 tbl_spark R 对象访问它。
这是我到目前为止所做的:
您需要将配置设置为递归读取文件夹(这里我在专用 python 单元格中对数据块执行此操作):
%py
sc._jsc.hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true")
然后我可以创建一个 RDD:
j_rdd <- spark_context(sc) %>%
invoke("textFile", "/mnt/my_cont/app_id/*/2022/11/17/*", 10L)
这是创建 RDD 的工作,如您所见,我可以用“*”映射所有分区(年份之前),以及在末尾用“*”递归地映射四个小时和分钟的文件夹。
我可以收集它并创建一个 R 数据框:
lst <- invoke(j_rdd, "collect")
data.frame(row = unlist(lst))
这正确地获取了我的数据,每个文件的每一行都有一列文本和一行(出于隐私原因我无法显示示例,但这并不重要)。
问题是我不想收集,但想用这些数据更新一个增量表,但找不到一种方法来获取我可以使用的 sparklyr 对象。我拿到的j_rdd对象是这样的:
>j_obj
<jobj[2666]>
org.apache.spark.rdd.MapPartitionsRDD
/mnt/my_cont/app_id/*/2022/11/17/* MapPartitionsRDD[80] at textFile at NativeMethodAccessorImpl.java:0
到目前为止我离得越近:我尝试复制代码here以使用调用将数据转换为数据帧,但我似乎做得不正确:
contents_field <- invoke_static(sc, "sparklyr.SQLUtils", "createStructField", "contents", "character", TRUE)
schema <- invoke_static(sc, "sparklyr.SQLUtils", "createStructType", list(contents_field))
j_df <- invoke(hive_context(sc), "createDataFrame", j_rdd, schema)
invoke(j_df, "createOrReplaceTempView", "tmp_test")
dfs <- tbl(sc, "tmp_test")
dfs %>% sdf_nrow()
我只有一列包含字符,所以我认为它可以工作,但我收到此错误:
Error : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 25.0 failed 4 times, most recent failure: Lost task 14.3 in stage 25.0 (TID 15158) (10.221.193.133 executor 2): java.lang.RuntimeException: Error while encoding: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, contents), StringType, false), true, false, true) AS contents#366
at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1192)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:236)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:208)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:156)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:95)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:233)
... 28 more
有谁知道如何将这个 RDD 对象(使用 R/sparklyr)转换为无需收集数据即可使用的调用函数的返回值?
【问题讨论】:
标签: r apache-spark databricks rdd sparklyr