【问题标题】:Calling forEach on a stream causes java.lang.IllegalStateException在流上调用 forEach 会导致 java.lang.IllegalStateException
【发布时间】:2020-11-12 05:43:09
【问题描述】:

我有一个执行以下操作的程序:

  1. 在 Main 方法中存储文件名

  2. 将该文件从 Main 传递给下面的方法(StreamParser)

  3. 方法 StreamParser 将该文件作为流读取

  4. StreamParser 应该返回 Stream

  5. 在 main 方法中,当我在 purchaseEventStream 上调用 forEach 时,会出现一行错误

    purchaseEventStream.forEach(purchaseEvent -> {

Exception in thread "main" java.lang.IllegalStateException: source already consumed or 
    closed
    at java.base/java.util.stream.AbstractPipeline.sourceSpliterator(AbstractPipeline.java:409)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
    at com.cognitree.internship.streamprocessing.Main.main(Main.java:22)

StreamParser 类

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Stream;

public class StreamParser {

public Stream<PurchaseEvent> parser(String fileName) throws IOException {
    Stream<PurchaseEvent> purchaseEventStream;
    try (Stream<String> lines = Files.lines(Paths.get(fileName))) {
        purchaseEventStream= lines.map(line -> {
            String[] fields = line.split(",");
            PurchaseEvent finalPurchaseEvent = new PurchaseEvent();
            finalPurchaseEvent.setSessionId(fields[0]);
            finalPurchaseEvent.setTimeStamp(fields[1]);
            finalPurchaseEvent.setItemId(fields[2]);
            finalPurchaseEvent.setPrice(fields[3]);
            finalPurchaseEvent.setQuantity(fields[4]);
            return finalPurchaseEvent;
        });
        return purchaseEventStream;
    }
}
}

主类

import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

public class Main {
public static void main(String[] args) throws IOException {
    OutputStreamWriter outputStream = new OutputStreamWriter(new 
    FileOutputStream("output1.txt"));
    String file = "/Users/mohit/intern-mohit/yoochoose-buys.dat";
    StreamParser streamParser = new StreamParser();
    List<ReportGenerator> reports = new ArrayList<>();
    PurchaseEventCount purchaseEventCount = new PurchaseEventCount();
    QuantityPerSession quantityPerSession = new QuantityPerSession();
    SessionCount sessionCount = new SessionCount();
    reports.add(purchaseEventCount);
    reports.add(sessionCount);
    reports.add(quantityPerSession);
    Stream<PurchaseEvent> purchaseEventStream = streamParser.parser(file);
    purchaseEventStream.forEach(purchaseEvent -> {
        for (ReportGenerator report : reports) {
            report.generateReports(purchaseEvent);
        }
    });
    reports.forEach(report -> {
        try {
            report.printReports(outputStream);
        } catch (IOException e) {
            e.printStackTrace();
        }
    });
}
}

为什么我会收到错误消息?

【问题讨论】:

  • 我建议您阅读如何编写minimal reproducible example。您问题中的代码不是可重现的示例。为了重现您的问题,不需要 PurchaseEventReportGeneratorPurchaseEventCountQuantityPerSessionSessionCount 类。

标签: java java-stream illegalstateexception


【解决方案1】:

java 中的流不是集合。它不存储数据。您应该从 StreamParser 类中的方法 parser() 创建并返回一个集合,然后从返回的集合创建一个流。

我重写了您的 StreamParser 类以返回 List

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;

public class StreamParser {

    public List<PurchaseEvent> parser(String fileName) throws IOException {
        List<PurchaseEvent> purchaseEventStream = Files.lines(Paths.get(fileName))
                                                       .map(line -> {
                                                           String[] fields = line.split(",");
                                                           PurchaseEvent finalPurchaseEvent = new PurchaseEvent();
                                                           finalPurchaseEvent.setSessionId(fields[0]);
                                                           finalPurchaseEvent.setTimeStamp(fields[1]);
                                                           finalPurchaseEvent.setItemId(fields[2]);
                                                           finalPurchaseEvent.setPrice(fields[3]);
                                                           finalPurchaseEvent.setQuantity(fields[4]);
                                                           return finalPurchaseEvent;
                                                       })
                                                       .collect(Collectors.toList());
        return purchaseEventStream;
    }
}

我相应地更改了您的 Main 课程。

import java.io.*;
import java.util.ArrayList;
import java.util.List;

public class Main {
    public static void main(String[] args) throws IOException {
        OutputStreamWriter outputStream = new OutputStreamWriter(new FileOutputStream("output1.txt"));
        String file = "/Users/mohit/intern-mohit/yoochoose-buys.dat";
        StreamParser streamParser = new StreamParser();
        List<ReportGenerator> reports = new ArrayList<>();
        PurchaseEventCount purchaseEventCount = new PurchaseEventCount();
        QuantityPerSession quantityPerSession = new QuantityPerSession();
        SessionCount sessionCount = new SessionCount();
        reports.add(purchaseEventCount);
        reports.add(sessionCount);
        reports.add(quantityPerSession);
        List<PurchaseEvent> purchaseEventStream = streamParser.parser(file);
        purchaseEventStream.forEach(purchaseEvent -> {
            for (ReportGenerator report : reports) {
                report.generateReports(purchaseEvent);
            }
        });
        reports.forEach(report -> {
            try {
                report.printReports(outputStream);
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        });
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-08-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多