【发布时间】:2018-10-05 12:36:08
【问题描述】:
我在 Cassandra (3.11.2) 中有数据,这也是我的 df :
Cassandra 中的数据:
id | some_data
-- | ---------
1 | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]
2 | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]
3 | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]
df 详细信息:
df.printSchema()
//| |-- id: integer (nullable = true)
//| |-- some_data: array (nullable = true)
//| | |-- element: struct (containsNull = true)
//| | | |-- s1: string (nullable = true)
//| | | |-- s2: string (nullable = true)
这里的 Cassandra 模式定义为:
id : 字符串
some_data : 列出冻结的 test_udt 创建为 --> 创建类型 test.test_udt ( s1 文本, s2 文本 );
我正在使用 spark-cassandra-connector 2.0 从 Cassandra 提取数据以在 Spark 2.2.1 上进行处理。
需要的输出
输出是df的分解形式
id | some_data | s1 | s2
-- | ---------------------------------------------------| ----- | ----
1 | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]| str11 | str12
1 | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]| str13 | str14
2 | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]| str21 | str22
2 | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]| str23 | str24
3 | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]| str31 | str32
3 | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]| str33 | str34
我过去的做法
我使用过 spark-cassandra-connector 1.6 和 Spark 1.6,我对上述问题有一个巧妙的解决方案:
import org.apache.spark.sql.functions._
case class my_data(s1 : String, s2 : String)
val flatData = df.explode(df("some_data")){
case Row(x : Seq[Row]) =>
x.map(x =>
my_data(
x.apply(0).asInstanceOf[String],
x.apply(1).asInstanceOf[String]
))
}
flatData.show()
升级到 2.x 后,我在使用 explode 函数时遇到错误。火花文档说explode 已弃用。建议将flatMap 作为explode 的替代品。
问题:
- 如何在 Scala 中分解 Dataframe 以获得与以前相同的结果?
- 如何使用
flatmap翻译我的旧代码?
【问题讨论】:
-
尝试使用
df.col("some_data").as[my_data] -
@vindev:这就像转换为数据集吗?我这样做了:
val b = df.col("some_data").as[my_data],我得到了一个错误:Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. not enough arguments for method as: (implicit evidence$1: org.apache.spark.sql.Encoder[U])org.apache.spark.sql.TypedColumn[Any,U]. Unspecified value parameter evidence$1. -
如错误所说,需要导入
spark.implicits._ -
@vindev :我的代码中已经有
spark.implicits._。我的坏..我应该在这里提到它。尽管添加了它,但我看到了同样的错误。以下解决方案对我有用。谢谢你回答我的问题:)
标签: scala apache-spark cassandra spark-cassandra-connector apache-spark-2.2