【发布时间】:2018-10-16 21:40:19
【问题描述】:
使用 Apache Beam Java SDK 2.1.0
我们在 Google Cloud 存储桶中有多个错误日志,我们正在读取此路径中的所有文件,使用 TextIO.read
我们需要对它们进行聚合,以便发现正在发送的消息中的错误。我们有一个可以工作的正则表达式,从每个文件中过滤不同的行,问题是TextIO.read 的性质我们得到一个PCollection 行,所以当尝试在pardo 中应用正则表达式时,它会处理它一行一行,正则表达式永远找不到匹配项。
正则表达式:"MESSAGE:(.*)\\n\\[MESSAGE\\].*(\\\"entityName\\\":\\\"\\w+\\\")"
TextIO.read 是否会返回 PCollection 的文件,每个元素一个文件,所以当它获得 parDo 时,我们可以访问完整的元素?
作为 b 计划,有没有办法使用 Apache Beam 将每个文件的所有行合并为一个,因此正则表达式模式有效?
public static class ExtractWordsFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
final String regex = "MESSAGE:(.*)\\n\\[MESSAGE\\].*(\\\"entityName\\\":\\\"\\w+\\\")";
final Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE | Pattern.UNICODE_CASE);
Matcher m = pattern.matcher(c.element());
List<String> entities = new ArrayList<String>();
while (m.find()) {
System.out.println("Full match: " + m.group(0));
for (int i = 1; i <= m.groupCount(); i++) {
entities.add(m.group(i));
}
}
// Output each word encountered into the output PCollection.
for (String entity : entities) {
c.output(entity);
}
}
}
【问题讨论】:
标签: regex google-cloud-dataflow apache-beam