【问题标题】:Map function to write on global spark rdd在全局 spark rdd 上写入的映射函数
【发布时间】:2017-07-14 16:24:11
【问题描述】:

我有一个字符串的 RDD。每一行对应各种日志。

我在一个函数中有多个正则表达式,它们匹配/大小写 RDD 的行以应用适应的正则表达式。

我想将这个独特的函数映射到我的 RDD 上,这样它就可以快速处理每一行,并将处理过的每一行存储在另一个全局 rdd 中。

问题是,因为我希望这个任务被并行化,我的全局 RDD 必须可以同时访问以添加每个处理的行。

我想知道是否有其他方法可以做到这一点或任何事情!我希望提高我的火花技能。

例如,这就是我想做的:

我有一个像这样的 txt:

错误:哈哈哈 param_error=8 param_err2=https

警告:HUHUHUHUH param_warn=tchu param_warn2=wifi

我的正则表达式函数会将包含“ERROR”的行与一个数组匹配,例如Array("Error","8","https")

另一个正则表达式函数会将包含“警告”的行与一个数组匹配,例如Array("Warning","tchu","wifi")

最后,我想为处理的每一行获取一个RDD[Array[String]]

如何使其与 Spark 保持并行?

【问题讨论】:

  • “我在一个函数中有多个正则表达式,它们匹配/大小写 RDD 的行以应用适应的正则表达式” - 您可以编辑您的帖子以包含 签名这个功能?

标签: scala apache-spark


【解决方案1】:

首先,重要的是要了解在 Spark 中没有像“全局 RDD”这样的东西,也没有理由需要这样的东西。使用 Spark 时,您应该考虑一个 RDD 转换为另一个 RDD,而不是考虑 更新 RDD(这是不可能的 - RDD 不可变 em>)。每个这样的转换都将由 Spark 分布式(并行)执行。

在这种情况下,如果我正确理解您的要求,您会希望将map 每条记录转换为以下结果之一:

  • Array[String],其中第一项是 "ERROR",或者:
  • Array[String],其中第一项是 "WARNING",或者:
  • 如果没有模式匹配记录,删除

为此,您可以使用RDDmap(f)collect(f) 方法:

// Sample data:
val rdd = sc.parallelize(Seq(
  "ERROR : Hahhaha param_error=8 param_err2=https",
  "WARNING : HUHUHUHUH param_warn=tchu param_warn2=wifi",
  "Garbage - not matching anything"
))

// First we can split in " : " to easily identify ERROR vs. WARNING 
val splitPrefix = rdd.map(line => line.split(" : "))

// Implement these parsing functions as you see fit; 
// The input would be the part following the " : ", 
// and the output should be a list of the values (not including the ERROR / WARNING) 
def parseError(v: String): List[String] = ??? // example input: "Hahhaha param_error=8 param_err2=https"
def parseWarning(v: String): List[String] = ??? // example input: "HUHUHUHUH param_warn=tchu param_warn2=wifi"

// Now we can use these functions in a pattern-matching function passed to RDD.collect,
// which will transform each value that matches one of the cases, and will filter out 
// values that don't match anything
val result: RDD[List[String]] = splitPrefix.collect {
  case Array(l @ "ERROR", v) => l :: parseError(v)
  case Array(l @ "WARNING", v) => l :: parseWarning(v)
  // NOT adding a default case, so records that didn't match will be removed
}    

// If you really want Array[String] and not List[String]:    
val arraysRdd: RDD[Array[String]] = result.map(_.toArray)

【讨论】:

  • 哦,是的,该死的,这正是我想要的!非常感谢,我不知道我们可以像这样使用收集功能。每天都在 Spark 中进步,谢谢! :p
猜你喜欢
  • 1970-01-01
  • 2014-07-18
  • 2018-01-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-11-12
  • 2015-01-30
  • 1970-01-01
相关资源
最近更新 更多