【发布时间】:2020-07-07 00:07:52
【问题描述】:
从 Spark 1.6 迁移到 Spark 2.2* 带来了错误“错误:无法找到存储在“数据集”中的类型的编码器。尝试将方法应用于从查询 parquet 表返回的数据集时的原始类型(Int、String 等)。 我过度简化了我的代码来演示同样的错误。代码查询 parquet 文件以返回以下数据类型: 'org.apache.spark.sql.Dataset [org.apache.spark.sql.Row]' 我应用一个函数来提取一个字符串和整数,返回一个字符串。返回以下内容 数据类型:数组[字符串] 接下来,我需要执行需要单独功能的大量操作。在这个测试函数中,我尝试附加一个字符串,产生与我的详细示例相同的错误。 我尝试了一些编码器示例和“案例”的使用,但没有提出可行的解决方案。任何建议/示例将不胜感激
scala> var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string,
cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]
val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x"
(1)+","+s.getDecimal(1);
scala> var d2 = d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]
scala> d2.take(1)
20/03/25 19:01:08 WARN TaskSetManager: Stage 3 contains a task of very large size (131 KB). The
maximum recommended task size is 100 KB.
res10: Array[String] = Array(ab04006000504304,1522194407.95162)
scala> def dd(s:String){
| s + "some string"
| }
dd: (s: String)Unit
scala> var d3 = d2.map{s=> dd(s) }
<console>:47: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int,
String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support
for serializing other types will be added in future releases.
为了进一步提炼问题,我相信这个场景(虽然我没有尝试所有可能的解决方案)可以进一步简化为以下代码:
scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]
scala> def f(s: String){
| s + "hi"
| }
f: (s: String)Unit
scala> var test2 = test.map{ s => f(s) }
<console>:42: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing spark.implicits._ Support for serializing other types
will be added in future releases.
var test2 = test.map{ s => f(s) }
【问题讨论】:
-
您的函数
dd()正在返回 Unit ford2.map{s=> dd(s) } -
为了进一步简化,我试图将一个函数映射到我的查询(数据集)的输出。这种代码风格在 spark 1.6 下完美运行。我尝试了以下帖子中提到的一些技术:stackoverflow.com/questions/39433419/…,包括使用“case”和“getAs”都无济于事。
-
hello koiralo:你是对的,我通过 def f(s: String): String = 在我的函数定义中更明确地解决了这个问题,尽管关键是在尝试 test.rdd.map。
标签: scala apache-spark dataset encoder