【问题标题】:How do I write a Dataset encoder to support mapping a function to a org.apache.spark.sql.Dataset[String] in Scala Spark如何编写数据集编码器以支持将函数映射到 Scala Spark 中的 org.apache.spark.sql.Dataset[String]
【发布时间】:2020-07-07 00:07:52
【问题描述】:

从 Spark 1.6 迁移到 Spark 2.2* 带来了错误“错误:无法找到存储在“数据集”中的类型的编码器。尝试将方法应用于从查询 parquet 表返回的数据集时的原始类型(Int、String 等)。 我过度简化了我的代码来演示同样的错误。代码查询 parquet 文件以返回以下数据类型: 'org.apache.spark.sql.Dataset [org.apache.spark.sql.Row]' 我应用一个函数来提取一个字符串和整数,返回一个字符串。返回以下内容 数据类型:数组[字符串] 接下来,我需要执行需要单独功能的大量操作。在这个测试函数中,我尝试附加一个字符串,产生与我的详细示例相同的错误。 我尝试了一些编码器示例和“案例”的使用,但没有提出可行的解决方案。任何建议/示例将不胜感激

scala> var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, 
cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]

val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x" 
(1)+","+s.getDecimal(1);

scala> var d2 =  d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]

scala> d2.take(1)
20/03/25 19:01:08 WARN TaskSetManager: Stage 3 contains a task of very large size (131 KB). The 
maximum recommended task size is 100 KB.
res10: Array[String] = Array(ab04006000504304,1522194407.95162)

scala> def dd(s:String){
 | s + "some string"
 | }
dd: (s: String)Unit

scala> var d3 = d2.map{s=> dd(s) }
<console>:47: error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, 
String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support 
for serializing other types will be added in future releases.

为了进一步提炼问题,我相信这个场景(虽然我没有尝试所有可能的解决方案)可以进一步简化为以下代码:

scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def f(s: String){
 | s + "hi"
 | }
f: (s: String)Unit

scala> var test2 = test.map{ s => f(s) }
<console>:42: error: Unable to find encoder for type stored in a Dataset.  
Primitive types (Int, String, etc) and Product types (case classes) are 
supported by importing spark.implicits._  Support for serializing other types 
will be added in future releases.
   var test2 = test.map{ s => f(s) }

【问题讨论】:

  • 您的函数 dd() 正在返回 Unit for d2.map{s=&gt; dd(s) }
  • 为了进一步简化,我试图将一个函数映射到我的查询(数据集)的输出。这种代码风格在 spark 1.6 下完美运行。我尝试了以下帖子中提到的一些技术:stackoverflow.com/questions/39433419/…,包括使用“case”和“getAs”都无济于事。
  • hello koiralo:你是对的,我通过 def f(s: String): String = 在我的函数定义中更明确地解决了这个问题,尽管关键是在尝试 test.rdd.map。

标签: scala apache-spark dataset encoder


【解决方案1】:

至少我的简化问题(如下)有一个解决方案。 我会测试更多......

scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def f(s: String): String = {
 | val r = s + "hi"
 | return r
 | }
f: (s: String)String

scala> var test2 = test.rdd.map{ s => f(s) }
test2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at map at <console>:43

scala> test2.take(1)
res9: Array[String] = Array(just some wordshi)

【讨论】:

  • 当前尝试无法扩展到我的应用程序,导致相关错误:org.apache.spark.SparkException: Task not serializable。如果我获取数据的子集并执行显式 toDF() 它将起作用,但不适用于完整的数据集
【解决方案2】:

第一个解决方案不适用于我的初始(生产)数据集,而是产生错误“org.apache.spark.SparkException:Task not serializable”(有趣的是,虽然两者都存储为相同的数据类型(org.apache. spark.sql.Dataset[String] = [value: string]) 我认为是相关的。我为我的测试数据集提供了另一个解决方案,它消除了初始编码器错误,如图所示实际上适用于我的玩具问题,不斜坡到生产数据集。对于我的应用程序在从 1.6 到 2.3 版本 spark 的迁移中被搁置的确切原因有点困惑,因为多年来我不必对我的应用程序进行任何特殊调整并且已经成功运行它进行计算最有可能数万亿。其他探索包括将我的方法包装为可序列化,探索@transient关键字,利用“org.apache.spark.serializer.KryoSerializer”,将我的方法编写为函数并将所有变量更改为' vals'(跟随'stack' 上的 ng 相关帖子)。

scala>  import spark.implicits._
import spark.implicits._

scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def f(s: String): String = {
 |   val r = s + "hi"
 |   return r
 |   }
 f: (s: String)String

 scala> var d2 =  test.map{s => f(s)}(Encoders.STRING)
 d2: org.apache.spark.sql.Dataset[String] = [value: string]

 scala> d2.take(1)
 res0: Array[String] = Array(just some wordshi)

斯卡拉>

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-07-30
    • 1970-01-01
    • 2021-01-19
    • 2021-05-23
    • 2017-12-15
    • 1970-01-01
    • 2018-10-07
    • 2020-03-16
    相关资源
    最近更新 更多