【发布时间】:2016-02-07 15:01:40
【问题描述】:
对 Spark 有点陌生。那么问题来了,我有一个通用格式的 txt 文件,比如说:Time:Message 所以我必须实现两件事:RDD 和模式组以及匹配。
将文件作为rdd:
val rdd1 = sc.textFile(location)
构建模式:
private val f1 = "([0-9]*)"
private val f2 = "([:])"
private val f3 = "(.*?)"
private val regex = s"$f1 $f2 $f3"
private val p = Pattern.compile(regex)
现在我想整合这两个,
rdd1.map(//What to do here)
我想检查每一行是否与一般格式匹配。如果不匹配,我想为每行不匹配的行显示一条错误消息。
如果匹配,我想为上述模式分组。 f1 是 group1,f2 是 group2,f3 是第三组。最后我想在 f3(message field) 中搜索 error,failure 等关键字。
我知道这有很多要求。提前谢谢。
我已经尝试过的:
def parseLine(s1: String): Option[Groups] = {
val matcher = p.matcher(s1)
if (matcher.find) {
println("General Format correct")
//group
Some(group(matcher))
//after format is verified search f3 for error,failure keyword.
}
else {
println("Format Wrong")
None
}
}
def group(matcher: Matcher) = {
Line(
matcher.group(1),
matcher.group(2),
matcher.group(3))}
case class Line(Time: String, colon: String, Message: String)
现在我被困在如何迭代 rdd 以将文本文件的每一行传递给函数。如果我将整个 rdd 传递给函数,即 RDD[String] 类型。像 matcher 这样的其他元素不起作用因为它需要 String 类型。 在阅读 rdd 函数时:(如果我错了,请纠正我),foreach 方法应该迭代 rdd 但我得到类型不匹配。目前正在尝试地图功能,但还没有。
正如我所说,我是 spark rdd 的新手。我不知道使用分区函数是否会帮助我而不是分组。
我真的需要一些有经验的人的指导。任何帮助都会得到帮助。
【问题讨论】:
-
"我想检查每一行是否符合一般格式。I" 这与 Spark 无关。你试过什么? “我想为上述模式创建组”“创建组”是什么意思?你将如何处理这个小组? “我想搜索”……然后呢? (请不要使用“我想”,这会让你听起来像在幼儿园)
-
“Spark 提供的主要抽象是弹性分布式数据集 (RDD),它是跨集群节点分区的元素集合,可以并行操作。”我在文本文件的第一行代码中使用了它。好的,我会更具体地说明我的尝试。
-
map是您想要的功能。从您的代码来看,rdd1.map(parseLine(_))应该可以工作。 -
感谢您的回复
标签: regex scala apache-spark rdd