【发布时间】:2018-03-27 23:57:22
【问题描述】:
我有数百个大型 (6GB) gzip 日志文件,我正在使用我希望解析的 GZIPInputStreams 读取这些文件。假设每个都有格式:
Start of log entry 1
...some log details
...some log details
...some log details
Start of log entry 2
...some log details
...some log details
...some log details
Start of log entry 3
...some log details
...some log details
...some log details
我正在通过BufferedReader.lines() 逐行传输 gzip 文件内容。流看起来像:
[
"Start of log entry 1",
" ...some log details",
" ...some log details",
" ...some log details",
"Start of log entry 2",
" ...some log details",
" ...some log details",
" ...some log details",
"Start of log entry 2",
" ...some log details",
" ...some log details",
" ...some log details",
]
每个日志条目的开头可以由谓词标识:line -> line.startsWith("Start of log entry")。我想根据这个谓词将这个Stream<String> 转换成Stream<Stream<String>>。每个“子流”应该在谓词为真时开始,并在谓词为假时收集行,直到下一次谓词为真,这表示这个子流的结束和下一个的开始。结果如下:
[
[
"Start of log entry 1",
" ...some log details",
" ...some log details",
" ...some log details",
],
[
"Start of log entry 2",
" ...some log details",
" ...some log details",
" ...some log details",
],
[
"Start of log entry 3",
" ...some log details",
" ...some log details",
" ...some log details",
],
]
从那里,我可以获取每个子流并将其映射到new LogEntry(Stream<String> logLines),以便将相关的日志行聚合到LogEntry 对象中。
这是一个大概的样子:
import java.io.*;
import java.nio.charset.*;
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
import static java.lang.System.out;
class Untitled {
static final String input =
"Start of log entry 1\n" +
" ...some log details\n" +
" ...some log details\n" +
" ...some log details\n" +
"Start of log entry 2\n" +
" ...some log details\n" +
" ...some log details\n" +
" ...some log details\n" +
"Start of log entry 3\n" +
" ...some log details\n" +
" ...some log details\n" +
" ...some log details";
static final Predicate<String> isLogEntryStart = line -> line.startsWith("Start of log entry");
public static void main(String[] args) throws Exception {
try (ByteArrayInputStream gzipInputStream
= new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); // mock for fileInputStream based gzipInputStream
InputStreamReader inputStreamReader = new InputStreamReader( gzipInputStream );
BufferedReader reader = new BufferedReader( inputStreamReader )) {
reader.lines()
.splitByPredicate(isLogEntryStart) // <--- What witchcraft should go here?
.map(LogEntry::new)
.forEach(out::println);
}
}
}
约束:我有数百个这样的大文件要并行处理(但每个文件只有一个顺序流),这使得将它们完全加载到内存中(例如,将它们存储为List<String> lines)是不可行的.
任何帮助表示赞赏!
【问题讨论】:
-
听起来像是 StreamEx 的工作。
-
@shmosel 有趣,正在研究!您知道可能会调用什么 API 吗?我尝试了诸如“分区”、“切片”、“块”和“分隔”之类的关键字,但无济于事
-
也许您可以将
collapse()与(line1, line2) -> line1.startsWith(...) && !line2.startsWith(...)谓词一起使用。 -
我建议 Spring Integration 并行处理多个文件。我曾经在单独的内核中并行处理 50 个 4-5 GB 的文件stackoverflow.com/questions/31819189/…
-
再想一想,
collapse()如果针对相邻元素进行评估,可能无法正常工作。
标签: java split java-stream lazy-evaluation predicate