【问题标题】:How to filter an rdd by data type?如何按数据类型过滤rdd?
【发布时间】:2019-01-26 22:29:19
【问题描述】:

我有一个 rdd,我试图只过滤浮点类型。 Spark rdds 是否提供任何方法来执行此操作?

我有一个 csv,我只需要将大于 40 的浮点值放入新的 rdd。为此,我正在检查它是否是 float 类型的实例并过滤它们。当我使用! 过滤时,所有字符串仍然存在于输出中,当我不使用! 时,输出为空。

val airports1 = airports.filter(line => !line.split(",")(6).isInstanceOf[Float])
val airports2 = airports1.filter(line => line.split(",")(6).toFloat > 40)

.toFloat,我遇到了NumberFormatException,我试图在try catch 块中处理它。

【问题讨论】:

    标签: scala apache-spark rdd


    【解决方案1】:

    因为你有一个纯字符串并且你试图从中获取浮点值,你实际上并没有按类型过滤。但是,如果它们可以被解析为浮动。
    您可以使用flatMapOption 来完成此操作。

    import org.apache.spark.sql.SparkSession
    import scala.util.Try
    
    val spark = SparkSession.builder.master("local[*]").appName("Float caster").getOrCreate()
    val sc = spark.sparkContext
    
    val data = List("x,10", "y,3.3", "z,a")
    val rdd = sc.parallelize(data) // rdd: RDD[String]
    val filtered = rdd.flatMap(line => Try(line.split(",")(1).toFloat).toOption) // filtered: RDD[Float]
    filtered.collect() // res0: Array[Float] = Array(10.0, 3.3)
    

    对于> 40 部分,您可以在Option 之后执行另一个过滤器或过滤内部Option
    (由于火花惰性,两者都应该执行或多或少相等,因此选择一个更清楚给你).

    // Option 1 - Another filter.
    val filtered2 = filtered.filter(x => x > 40)
    
    // Option 2 - Filter the inner option in one step.
    val filtered = rdd.flatMap(line => Try(line.split(",")(1).toFloat).toOption.filter(x => x > 40))
    

    如果您有任何问题,请告诉我。

    【讨论】:

    • 非常感谢您的及时回复。真的很感激。在实现这个之后,我仍然看到我的输出像这样"North Sea" Some(59.35) "Shepparton" None。有没有办法从这里删除选项输出?
    • @swamoch 即使用 flatMap 一个选项可以被视为 zeroone 元素的集合,flatMap 将其变平如果您仍然使用filter,则删除 Nones 并解压缩 Somes,这就是您仍然拥有 Option 包装器的原因。删除它们的最佳方法是使用flatMapflatten,或者使用getOrElse 方法检索给定的值,如果没有值则使用默认值。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-11-06
    相关资源
    最近更新 更多