【问题标题】:Filtering an RDD[T] to a subclass of type T将 RDD[T] 过滤为 T 类型的子类
【发布时间】:2017-05-04 12:18:39
【问题描述】:

我正在使用 Spark 读取文本文件。每行可以属于不同的案例类。一旦我将线条转换为案例类描述的对象,我会将它们转换为数据帧并写入 HDFS(镶木地板)。我遇到的问题是我最终得到了一个抽象类型的 RDD,我需要将其限制为特定的案例类类型以应用 toDF 函数。

到目前为止,我已将日志事件定义如下:

abstract class LogEvent
final case class Web(datetime: String, ... )
final case class OtherEvent(datetime: String ...)

我正在阅读我的文本文件,然后将行映射到模式匹配函数以创建 RDD[LogEvent]:

def convertToCase(e: List[String]): LogEvent= e match {
  case List(_, _, _, "WEB", _, _, _, _, _, _, _, _, _) =>
    Web(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10))
  case List(_, _, _, "OTHEREVENT", _, _, _, _, _, _, _, _) =>
    OtherEvent(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10))
}

此时我希望限制到给定的案例类并转换为 Spark 数据帧。比如:

val events = spark.read.textFile(...)
  .map(_.split(',').toList)
  .map(convertToCase)

然后我想将 RDD[LogEvent] 减少为 T 类型的 RDD,它可能在集合 {Web, OtherEvent} 中。这就是我正在努力解决的问题。应用带有谓词的过滤器来约束案例类不会更改 LogEvent 的类型,这意味着我不能调用“toDF()”,因为它必须在 RDD[T] 上调用,其中 T 是特定的案例类,而不是抽象类 RDD[LogEvent]。

val webEvents = events.filter(someLogic).toDF()

我正在寻找一种可以将通用 RDD 缩减为特定案例类的 RDD 的方法。我试图通过不使用 isInstanceOf 或 asInstanceOf 来实现这一点,同时保持类型安全。

有没有简单的解决方案?还是我以错误的方式解决问题?

提前致谢。

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    您应该使用collect(f: PartialFunction[T, U]): RDD[U] 方法(不要与collect(): Array[T]混淆,后者将结果作为数组发送给驱动程序):

    val webEvents = events.collect{
      case w: Web => w 
    }.toDF()
    

    collectmapfilter 的混合:如果输入匹配模式匹配中给出的一种情况,它将输出偏函数给出的值。否则,它将简单地忽略(即过滤掉)输入。

    请注意,您可能也应该为您的convertToCase 执行此操作,因为您定义的模式匹配不完整,如果遇到意外事件或损坏的行,您可能会在运行时收到错误。正确的方法是定义

    val convertToCase: PartialFunction[List[String], LogEvent] = {
      case List(_, _, _, "WEB", _, _, _, _, _, _, _, _, _) =>
        Web(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10))
      case List(_, _, _, "OTHEREVENT", _, _, _, _, _, _, _, _) =>
        OtherEvent(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10))
    }
    

    然后将map(convertToCase) 替换为collect(convertToCase)

    【讨论】:

    • 谢谢!我偶然发现了 collect 并认为它可以解决我的问题,但是与 Spark 一起使用这不会将所有事件返回给驱动程序吗?我应该在正文中指定,但我认为由于 RDD 实现而无法使用 collect?
    • collect()collect[U](pf: PartialFunction[T, U]) 是 RDD 上两种截然不同的方法。只要为方法提供部分函数参数,您就会安全(否则,编译器确实会假定您要收集到驱动程序)。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-08-04
    • 1970-01-01
    • 1970-01-01
    • 2018-10-27
    相关资源
    最近更新 更多