【问题标题】:Explode Cassandra UDT with flatmap in Spark 2.x (Scala)在 Spark 2.x (Scala) 中使用 flatmap 爆炸 Cassandra UDT
【发布时间】:2018-10-05 12:36:08
【问题描述】:

我在 Cassandra (3.11.2) 中有数据,这也是我的 df :

Cassandra 中的数据:

id | some_data  
-- | ---------  
1  | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]
2  | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]
3  | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]

df 详细信息:

 df.printSchema() 
    //|  |-- id: integer (nullable = true)
    //|  |-- some_data: array (nullable = true)
    //|  |    |-- element: struct (containsNull = true)
    //|  |    |    |-- s1: string (nullable = true)
    //|  |    |    |-- s2: string (nullable = true)

这里的 Cassandra 模式定义为:

id : 字符串
some_data : 列出冻结的 test_udt 创建为 --> 创建类型 test.test_udt ( s1 文本, s2 文本 );

我正在使用 spark-cassandra-connector 2.0 从 Cassandra 提取数据以在 Spark 2.2.1 上进行处理。

需要的输出

输出是df的分解形式

id | some_data                                          | s1    | s2  
-- | ---------------------------------------------------| ----- | ---- 
1  | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]| str11 | str12
1  | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]| str13 | str14 
2  | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]| str21 | str22
2  | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]| str23 | str24
3  | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]| str31 | str32
3  | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]| str33 | str34

我过去的做法

我使用过 spark-cassandra-connector 1.6 和 Spark 1.6,我对上述问题有一个巧妙的解决方案:

import org.apache.spark.sql.functions._    
case class my_data(s1 : String, s2 : String)

val flatData = df.explode(df("some_data")){
            case Row(x : Seq[Row]) =>
                x.map(x =>
                    my_data(
                        x.apply(0).asInstanceOf[String], 
                        x.apply(1).asInstanceOf[String]
                    ))
                  }
flatData.show()

升级到 2.x 后,我在使用 explode 函数时遇到错误。火花文档说explode 已弃用。建议将flatMap 作为explode 的替代品。

问题:

  1. 如何在 Scala 中分解 Dataframe 以获得与以前相同的结果?
  2. 如何使用flatmap 翻译我的旧代码?

【问题讨论】:

  • 尝试使用df.col("some_data").as[my_data]
  • @vindev:这就像转换为数据集吗?我这样做了:val b = df.col("some_data").as[my_data],我得到了一个错误:Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. not enough arguments for method as: (implicit evidence$1: org.apache.spark.sql.Encoder[U])org.apache.spark.sql.TypedColumn[Any,U]. Unspecified value parameter evidence$1.
  • 如错误所说,需要导入spark.implicits._
  • @vindev :我的代码中已经有spark.implicits._。我的坏..我应该在这里提到它。尽管添加了它,但我看到了同样的错误。以下解决方案对我有用。谢谢你回答我的问题:)

标签: scala apache-spark cassandra spark-cassandra-connector apache-spark-2.2


【解决方案1】:

您可以使用explode function,这也是explode 方法的替代方法。 getItem 用于通过名称从struct 中获取字段。

df.withColumn("exploded" , explode($"some_data"))
  .withColumn("s1" , $"exploded".getItem("s1"))
  .withColumn("s2" , $"exploded".getItem("s2"))
  .drop("exploded")
  .show(false)

//+---+------------------------------+-----+-----+
//|id |some_data                     |s1   |s2   |
//+---+------------------------------+-----+-----+
//|1  |[[str11,str12], [str13,str14]]|str11|str12|
//|1  |[[str11,str12], [str13,str14]]|str13|str14|
//|2  |[[str21,str22], [str23,str24]]|str21|str22|
//|2  |[[str21,str22], [str23,str24]]|str23|str24|
//|3  |[[str31,str32], [str33,str44]]|str31|str32|
//|3  |[[str31,str32], [str33,str44]]|str33|str44|
//+---+------------------------------+-----+-----+

【讨论】:

  • 这行得通!谢谢你。您认为在语句中使用多个 explode 与使用 flatmap 时性能如何?
  • 您不能在一个语句中使用多个explodes。性能应该等同于建议作为 flatMapexplode 方法的替代方法
猜你喜欢
  • 2022-01-16
  • 1970-01-01
  • 2016-11-09
  • 1970-01-01
  • 2017-08-08
  • 1970-01-01
  • 2019-07-24
  • 2018-04-27
  • 2019-04-04
相关资源
最近更新 更多