【问题标题】:How to parse log lines using Spark that could span multiple lines如何使用可能跨越多行的 Spark 解析日志行
【发布时间】:2015-09-04 23:56:43
【问题描述】:

我正在开发一个可以读取和解析自定义日志文件的 Spark/Scala 应用程序。我在解析多行日志条目时遇到问题。这是我的代码的 sn-p:

case class MLog(dateTime: String, classification: String, serverType: String, identification:String, operation: String)
val PATTERN = """(?s)(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\s+(\w+)s+\[(.*)\]\s+\[(.*)\]\s+(.*)"""


def parseLogLine(log: String): MLog={
     val res = PATTERN.findFirstMatchIn(log)
     if (res.isEmpty) {
     throw new RuntimeException("Cannot parse log line: " + log)

     MLog(m.group(1),m.group(2),m.group(3),m.group(4),m.group(5))
}

sc.textFile("/mydirectory/logfile").map(parseLogLine).foreach(println)

日志文件中的某些条目跨越多行。正则表达式适用于单行条目,但是当读取多行条目时,如下所示,

2015-08-31 00:10:17,682 WARN  [ScheduledTask-10] [name=custname;mid=9999;ds=anyvalue;] datasource - Scheduled DataSource import failed.                 
com.xxx.common.service.ServiceException: system failure: Unable to connect to ANY server: LdapDataSource{id=xxx, type=xxx, enabled=true, name=xxx, host=xxx port=999, connectionType=ssl, username=xxx, folderId=99999}

我收到此错误:

无法解析日志行:com.xxx.common.service.ServiceException:系统故障:无法连接到任何服务器:LdapDataSource{id=xxx, type=xxx, enabled=true, name=xxx, host=xxx 端口=999,connectionType=ssl,username=xxx,folderId=99999}

如何让 Spark 从日志文件中读取多行日志条目?

【问题讨论】:

  • 单个文件有多大?这些可以装在一个工人身上吗?
  • 是的。该文件很小 - 大约 40MB。该函数将打印出单行条目,但是当它到达多行条目时,它会失败并出现我提到的错误。

标签: regex scala apache-spark multiline


【解决方案1】:

由于输入文件很小,您可以使用SparkContext.wholeTextFiles

// Parse a single file and return all extracted entries
def parseLogFile(log: String): Iterator[MLog] = {
    val p: scala.util.matching.Regex = ???
    p.findAllMatchIn(log).map(
        m => MLog(m.group(1), m.group(2), m.group(3), m.group(4), m.group(5))
    )
}

val rdd: RDD[MLog] = sc
   .wholeTextFiles("/path/to/input/dir")
   .flatMap{case (_, txt) => parseLogFile(txt)}

【讨论】:

  • 谢谢!那行得通!如果我想使用 sc.textFiles 方法怎么办?随着时间的推移,日志文件可能会变得非常大,我想确保我仍然可以使用相同的代码。
  • 通常没有单一的方法来处理多行记录。如果您可以处理一些信息丢失,您可以简单地mapPartitions 并删除不完整的记录。如果不是,您可以编写自定义 Hadoop 输入格式或mapPartitions,并稍后修复跨越两个分区的记录。最后,您可以尝试配置日志记录,以将输入文件保持在可由单个工作人员处理的合理大小。
  • 如何判断一个文件是否小到可以被wholeTextFiles()处理? (我的文件约为 250 MB ...)
  • @c-rod 这是一个使用 textFile 的示例,主要区别在于使用 Regex 方法 findFirstMatchIn 返回 Option[Match] 而不是 findAllMatchIn 返回 Iterator[Match] 并使用 @ 987654328@ 而不是 flatmap。在wholeTextFiles 示例中,您需要一个迭代器,因为它将包含文件中的所有匹配项,因此需要对该迭代器进行平面映射。使用 textFile,您只需 map 行。请参阅:community.hortonworks.com/articles/34362/… 请注意,textFile 方法将拆分行,因此没有多行。你需要 wholeTextFiles
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-05-08
  • 2018-11-02
  • 2012-04-20
  • 1970-01-01
  • 1970-01-01
  • 2020-06-04
  • 1970-01-01
相关资源
最近更新 更多