【问题标题】:Java split stream by predicate into stream of streamsJava通过谓词将流拆分为流
【发布时间】:2018-03-27 23:57:22
【问题描述】:

我有数百个大型 (6GB) gzip 日志文件,我正在使用我希望解析的 GZIPInputStreams 读取这些文件。假设每个都有格式:

Start of log entry 1
    ...some log details
    ...some log details
    ...some log details
Start of log entry 2
    ...some log details
    ...some log details
    ...some log details
Start of log entry 3
    ...some log details
    ...some log details
    ...some log details

我正在通过BufferedReader.lines() 逐行传输 gzip 文件内容。流看起来像:

[
    "Start of log entry 1",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
    "Start of log entry 2",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
    "Start of log entry 2",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
]

每个日志条目的开头可以由谓词标识:line -> line.startsWith("Start of log entry")。我想根据这个谓词将这个Stream<String> 转换成Stream<Stream<String>>。每个“子流”应该在谓词为真时开始,并在谓词为假时收集行,直到下一次谓词为真,这表示这个子流的结束和下一个的开始。结果如下:

[
    [
        "Start of log entry 1",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
    [
        "Start of log entry 2",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
    [
        "Start of log entry 3",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
]

从那里,我可以获取每个子流并将其映射到new LogEntry(Stream<String> logLines),以便将相关的日志行聚合到LogEntry 对象中。

这是一个大概的样子:

import java.io.*;
import java.nio.charset.*;
import java.util.*;
import java.util.function.*;
import java.util.stream.*;

import static java.lang.System.out;

class Untitled {
    static final String input = 
        "Start of log entry 1\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "Start of log entry 2\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "Start of log entry 3\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details";

    static final Predicate<String> isLogEntryStart = line -> line.startsWith("Start of log entry"); 

    public static void main(String[] args) throws Exception {
        try (ByteArrayInputStream gzipInputStream
        = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); // mock for fileInputStream based gzipInputStream
             InputStreamReader inputStreamReader = new InputStreamReader( gzipInputStream ); 
             BufferedReader reader = new BufferedReader( inputStreamReader )) {

            reader.lines()
                .splitByPredicate(isLogEntryStart) // <--- What witchcraft should go here?
                .map(LogEntry::new)
                .forEach(out::println);
        }
    }
}

约束:我有数百个这样的大文件要并行处理(但每个文件只有一个顺序流),这使得将它们完全加载到内存中(例如,将它们存储为List&lt;String&gt; lines)是不可行的.

任何帮助表示赞赏!

【问题讨论】:

  • 听起来像是 StreamEx 的工作。
  • @shmosel 有趣,正在研究!您知道可能会调用什么 API 吗?我尝试了诸如“分区”、“切片”、“块”和“分隔”之类的关键字,但无济于事
  • 也许您可以将collapse()(line1, line2) -&gt; line1.startsWith(...) &amp;&amp; !line2.startsWith(...) 谓词一起使用。
  • 我建议 Spring Integration 并行处理多个文件。我曾经在单独的内核中并行处理 50 个 4-5 GB 的文件stackoverflow.com/questions/31819189/…
  • 再想一想,collapse() 如果针对相邻元素进行评估,可能无法正常工作。

标签: java split java-stream lazy-evaluation predicate


【解决方案1】:

Frederico 的回答可能是解决这个特定问题的最佳方式。在他最后一次想到自定义Spliterator 之后,我将添加一个对a similar question 的答案的改编版本,我建议使用自定义迭代器来创建分块流。这种方法也适用于不是由输入阅读器创建的其他流。

public class StreamSplitter<T>
    implements Iterator<Stream<T>>
{
    private Iterator<T>  incoming;
    private Predicate<T> startOfNewEntry;
    private T            nextLine;

    public static <T> Stream<Stream<T>> streamOf(Stream<T> incoming, Predicate<T> startOfNewEntry)
    {
        Iterable<Stream<T>> iterable = () -> new StreamSplitter<>(incoming, startOfNewEntry);
        return StreamSupport.stream(iterable.spliterator(), false);
    }

    private StreamSplitter(Stream<T> stream, Predicate<T> startOfNewEntry)
    {
        this.incoming = stream.iterator();
        this.startOfNewEntry = startOfNewEntry;
        if (incoming.hasNext())
            nextLine = incoming.next();
    }

    @Override
    public boolean hasNext()
    {
        return nextLine != null;
    }

    @Override
    public Stream<T> next()
    {
        List<T> nextEntrysLines = new ArrayList<>();
        do
        {
            nextEntrysLines.add(nextLine);
        } while (incoming.hasNext()
                 && !startOfNewEntry.test((nextLine = incoming.next())));

        if (!startOfNewEntry.test(nextLine)) // incoming does not have next
            nextLine = null;

        return nextEntrysLines.stream();
    }
}

示例

public static void main(String[] args)
{
    Stream<String> flat = Stream.of("Start of log entry 1",
                                    "    ...some log details",
                                    "    ...some log details",
                                    "Start of log entry 2",
                                    "    ...some log details",
                                    "    ...some log details",
                                    "Start of log entry 3",
                                    "    ...some log details",
                                    "    ...some log details");

    StreamSplitter.streamOf(flat, line -> line.matches("Start of log entry.*"))
                  .forEach(logEntry -> {
                      System.out.println("------------------");
                      logEntry.forEach(System.out::println);
                  });
}

// Output
// ------------------
// Start of log entry 1
//     ...some log details
//     ...some log details
// ------------------
// Start of log entry 2
//     ...some log details
//     ...some log details
// ------------------
// Start of log entry 3
//     ...some log details
//     ...some log details

迭代器总是向前看一行。一旦该 lline 是新条目的开头,它就会将前一个条目包装在一个流中并将其返回为next。工厂方法streamOf 将此迭代器转换为流,以便在我上面给出的示例中使用。

我将拆分条件从正则表达式更改为Predicate,因此您可以借助多个正则表达式、if 条件等指定更复杂的条件。

请注意,我仅使用上面的示例数据对其进行了测试,因此我不知道它在更复杂、错误或空输入时会如何表现。

【讨论】:

    【解决方案2】:

    我认为主要问题是您正在逐行读取并尝试在行外创建 LogEntry 实例,而不是逐块读取(可能涵盖多行)。

    为此,您可以使用带有适当正则表达式的Scanner.findAll(Java 9 起可用):

    String input =
            "Start of log entry 1\n"        +
            "    ...some log details 1.1\n" +
            "    ...some log details 1.2\n" +
            "    ...some log details 1.3\n" +
            "Start of log entry 2\n"        +
            "    ...some log details 2.1\n" +
            "    ...some log details 2.2\n" +
            "    ...some log details 2.3\n" +
            "Start of log entry 3\n"        +
            "    ...some log details 3.1\n" +
            "    ...some log details 3.2\n" +
            "    ...some log details 3.3";
    
    try (ByteArrayInputStream gzip = 
             new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8));
         InputStreamReader reader = new InputStreamReader(gzip);
         Scanner scanner = new Scanner(reader)) {
    
        String START = "Start of log entry \\d+";
        Pattern pattern = Pattern.compile(
                START + "(?<=" + START + ").*?(?=" + START + "|$)", 
                Pattern.DOTALL);
    
        scanner.findAll(pattern)
                .map(MatchResult::group)
                .map(s -> s.split("\\R"))
                .map(LogEntry::new)
                .forEach(System.out::println);
    
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
    

    因此,这是通过在 Scanner 实例中懒惰地查找匹配项来实现的。 Scanner.findAll 返回 Stream&lt;MatchResult&gt;MatchResult.group() 返回匹配的 String。然后我们用换行符 (\\R) 分割这个字符串。这将返回一个String[],数组的每个元素都是每一行。然后,假设LogEntry 有一个接受String[] 参数的构造函数,我们将这些数组中的每一个转换为LogEntry 实例。最后,假设 LogEntry 有一个覆盖的 toString() 方法,我们将每个 LogEntry 实例打印到输出。

    值得一提的是,Scanner 在流上调用forEach 时开始工作。

    另外一个注释是我们用来匹配输入中的日志条目的正则表达式。我不是正则表达式领域的专家,所以我几乎可以肯定这里有很大的改进空间。首先,我们使用Pattern.DOTALL,以便. 不仅匹配常见字符,还匹配换行符。然后,有实际的正则表达式。这个想法是它匹配并消耗Start of log entry \\d+,然后它使用一个look-behind来对抗Start of log entry \\d+,然后它以一个非贪婪的形式消耗来自输入的字符方式(这是.*? 部分),最后它前瞻检查是否再次出现Start of log entry \\d+ 或是否已到达输入的末尾。如果你想深入研究这个主题,请参考这个amazing article about regular expressions


    如果您不在 Java 9+ 上,我不知道有任何类似的替代方案。但是,您可以做的是创建一个自定义的Spliterator,它包装由BufferedReader.lines() 返回的流返回的Spliterator,并向其中添加所需的解析行为。然后,您需要从这个Spliterator 中创建一个新的Stream。根本不是小事……

    【讨论】:

    • 很好,这看起来是个不错的方法。日志条目的开头多种多样且不容易匹配,但我会试一试并报告!
    • 很棒的发现,这个方法应该让很多输入处理变得简单多了!可能比自定义 (Spl)Iterator 更适合这种特殊情况。唯一的优势是能够处理来自阅读器以外的其他来源的流。
    猜你喜欢
    • 2014-01-11
    • 2021-09-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多