【发布时间】:2017-05-04 12:18:39
【问题描述】:
我正在使用 Spark 读取文本文件。每行可以属于不同的案例类。一旦我将线条转换为案例类描述的对象,我会将它们转换为数据帧并写入 HDFS(镶木地板)。我遇到的问题是我最终得到了一个抽象类型的 RDD,我需要将其限制为特定的案例类类型以应用 toDF 函数。
到目前为止,我已将日志事件定义如下:
abstract class LogEvent
final case class Web(datetime: String, ... )
final case class OtherEvent(datetime: String ...)
我正在阅读我的文本文件,然后将行映射到模式匹配函数以创建 RDD[LogEvent]:
def convertToCase(e: List[String]): LogEvent= e match {
case List(_, _, _, "WEB", _, _, _, _, _, _, _, _, _) =>
Web(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10))
case List(_, _, _, "OTHEREVENT", _, _, _, _, _, _, _, _) =>
OtherEvent(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10))
}
此时我希望限制到给定的案例类并转换为 Spark 数据帧。比如:
val events = spark.read.textFile(...)
.map(_.split(',').toList)
.map(convertToCase)
然后我想将 RDD[LogEvent] 减少为 T 类型的 RDD,它可能在集合 {Web, OtherEvent} 中。这就是我正在努力解决的问题。应用带有谓词的过滤器来约束案例类不会更改 LogEvent 的类型,这意味着我不能调用“toDF()”,因为它必须在 RDD[T] 上调用,其中 T 是特定的案例类,而不是抽象类 RDD[LogEvent]。
val webEvents = events.filter(someLogic).toDF()
我正在寻找一种可以将通用 RDD 缩减为特定案例类的 RDD 的方法。我试图通过不使用 isInstanceOf 或 asInstanceOf 来实现这一点,同时保持类型安全。
有没有简单的解决方案?还是我以错误的方式解决问题?
提前致谢。
【问题讨论】:
标签: scala apache-spark