【问题标题】:Scala RDD with pattern matching from a text file具有来自文本文件的模式匹配的 Scala RDD
【发布时间】:2016-02-07 15:01:40
【问题描述】:

对 Spark 有点陌生。那么问题来了,我有一个通用格式的 txt 文件,比如说:Time:Message 所以我必须实现两件事:RDD 和模式组以及匹配。

将文件作为rdd:

val rdd1 = sc.textFile(location)

构建模式:

private val f1 = "([0-9]*)"
private val f2 = "([:])"
private val f3 = "(.*?)"
private val regex = s"$f1 $f2 $f3"
private val p = Pattern.compile(regex)

现在我想整合这两个,

rdd1.map(//What to do here) 

我想检查每一行是否与一般格式匹配。如果不匹配,我想为每行不匹配的行显示一条错误消息。

如果匹配,我想为上述模式分组。 f1 是 group1,f2 是 group2,f3 是第三组。最后我想在 f3(message field) 中搜索 error,failure 等关键字。

我知道这有很多要求。提前谢谢。

我已经尝试过的:

def parseLine(s1: String): Option[Groups] = {
val matcher = p.matcher(s1)
if (matcher.find) {
  println("General Format correct")
  //group
  Some(group(matcher))
  //after format is verified search f3 for error,failure keyword.

}
else {
  println("Format Wrong")
  None
}
}

def group(matcher: Matcher) = {
Line(
  matcher.group(1),
  matcher.group(2),
  matcher.group(3))}

case class Line(Time: String, colon: String, Message: String)

现在我被困在如何迭代 rdd 以将文本文件的每一行传递给函数。如果我将整个 rdd 传递给函数,即 RDD[String] 类型。像 matcher 这样的其他元素不起作用因为它需要 String 类型。 在阅读 rdd 函数时:(如果我错了,请纠正我),foreach 方法应该迭代 rdd 但我得到类型不匹配。目前正在尝试地图功能,但还没有。

正如我所说,我是 spark rdd 的新手。我不知道使用分区函数是否会帮助我而不是分组。

我真的需要一些有经验的人的指导。任何帮助都会得到帮助。

【问题讨论】:

  • "我想检查每一行是否符合一般格式。I" 这与 Spark 无关。你试过什么? “我想为上述模式创建组”“创建组”是什么意思?你将如何处理这个小组? “我想搜索”……然后呢? (请不要使用“我想”,这会让你听起来像在幼儿园)
  • “Spark 提供的主要抽象是弹性分布式数据集 (RDD),它是跨集群节点分区的元素集合,可以并行操作。”我在文本文件的第一行代码中使用了它。好的,我会更具体地说明我的尝试。
  • map 是您想要的功能。从您的代码来看,rdd1.map(parseLine(_)) 应该可以工作。
  • 感谢您的回复

标签: regex scala apache-spark rdd


【解决方案1】:

对于像这样的简单示例,使用 RDD 的方式通常与使用简单的 Scala 序列的方式相同。如果我们将 Spark 排除在外,那么一种方法是:

import scala.util.{Failure, Success}

val input = List(
  "123 : Message1 Error",
  "This line doesn't conform",
  "345 : Message2",
  "Neither does this one",
  "789 : Message3 Error"
)

val f1 = "([0-9]*)"
val f2 = "([:])"
val f3 = "(.*)"
val regex = s"$f1 $f2 $f3".r

case class Line(a: String, b: String, c: String)


//Use Success and Failure as the functional way of representing
//operations which might not succeed
val parsed = input.map { str =>
  regex.findFirstMatchIn(str).map(m => Line(m.group(1), m.group(2), m.group(3))) match {
    case Some(l) => Success(l)
    case None => Failure(new Exception(s"Non matching input: $str"))
  }
}

//Now split the parsed result so we can handle the two types of outcome separately
val matching = parsed.filter(_.isSuccess)
val nonMatching = parsed.filter(_.isFailure)

nonMatching.foreach(println)

//Filter for only those messages we're interested in
val messagesWithError = matching.collect{
  case Success(l @ Line(_,_,m)) if m.contains("Error") => l
}

messagesWithError.foreach(println)

我们在 Spark 中执行此操作有什么不同?不多:

  val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Example"))

  import scala.util.{Failure, Success}

  val input = List(
    "123 : Message1 Error",
    "This line doesn't conform",
    "345 : Message2",
    "Neither does this one",
    "789 : Message3 Error"
  )

  val f1 = "([0-9]*)"
  val f2 = "([:])"
  val f3 = "(.*)"
  val regex = s"$f1 $f2 $f3".r

  case class Line(a: String, b: String, c: String)

  val inputRDD = sc.parallelize(input)

  //Use Success and Failure as the functional way of representing
  //operations which might not succeed
  val parsedRDD = inputRDD.map { str =>
    regex.findFirstMatchIn(str).map(m => Line(m.group(1), m.group(2), m.group(3))) match {
      case Some(l) => Success(l)
      case None => Failure(new Exception(s"Non matching input: $str"))
    }
  }

  //Now split the parsed result so we can handle the two types of outcome separately
  val matchingRDD = parsedRDD.filter(_.isSuccess)
  val nonMatchingRDD = parsedRDD.filter(_.isFailure)

  //We use collect() to bring the results back from the Spark workers to the Driver
  nonMatchingRDD.collect().foreach(println)

  //Filter for only those messages we're interested in
  val messagesWithError = matchingRDD.collect {
    case Success(l@Line(_, _, m)) if m.contains("Error") => l
  }

  //We use collect() to bring the results back from the Spark workers to the Driver
  messagesWithError.collect().foreach(println)
}

如果生成的数据集非常大,使用collect() 将结果提供给驱动程序是不合适的,但使用println 记录结果也不合适。

【讨论】:

  • 感谢您的回复
猜你喜欢
  • 2017-01-08
  • 2016-03-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-11-09
  • 2022-01-16
  • 1970-01-01
相关资源
最近更新 更多