【问题标题】:How to properly close a variable amount of streams?如何正确关闭可变数量的流?
【发布时间】:2015-05-07 13:47:21
【问题描述】:

我正在创建多个必须并行(或可能并行)访问的流。我知道在编译时资源量是固定的情况下如何进行try-with-resources,但是如果资源量是由参数决定的呢?

我有这样的事情:

private static void foo(String path, String... files) throws IOException {
    @SuppressWarnings("unchecked")
    Stream<String>[] streams = new Stream[files.length];

    try {
        for (int i = 0; i < files.length; i++) {
            final String file = files[i];
            streams[i] = Files.lines(Paths.get(path, file))
                .onClose(() -> System.out.println("Closed " + file));
        }

        // do something with streams
        Stream.of(streams)
            .parallel()
            .flatMap(x -> x)
            .distinct()
            .sorted()
            .limit(10)
            .forEach(System.out::println);
    }
    finally {
        for (Stream<String> s : streams) {
            if (s != null) {
                s.close();
            }
        }
    }
}

【问题讨论】:

  • 您是否在询问是否有可以处理您的情况的试用资源?答案是否定的,但你所拥有的一切都很好。
  • 另一种方法是将流的开放移动到并行操作中,每个操作只需要处理一个流。
  • 是的,尽管有一个问题:理论上close() 可以(尽管在实践中不太可能)抛出一个UncheckedIOException,所以你应该将s.close() 包裹在try { s.close(); } catch (Exception ex) { //quash or log } 中。
  • 当心! sorted()/distinct()forEach(而不是 forEachOrdered)的组合是问题的根源。见stackoverflow.com/q/28259636/2711488

标签: java java-8 java-stream try-with-resources


【解决方案1】:

您可以编写一个复合AutoCloseable 来管理AutoCloseable 的动态数量:

import java.util.ArrayList;
import java.util.List;

public class CompositeAutoclosable<T extends AutoCloseable> implements AutoCloseable {
    private final List<T> components= new ArrayList<>();

    public void addComponent(T component) { components.add(component); }

    public List<T> getComponents() { return components; }

    @Override
    public void close() throws Exception {
        Exception e = null;
        for (T component : components) {
            try { component.close(); }
            catch (Exception closeException) {
                if (e == null) { e = closeException; }
                else { e.addSuppressed(closeException); }
            }
        }
        if (e != null) { throw e; }
    }
}

你可以在你的方法中使用它:

private static void foo(String path, String... files) throws Exception {
    try (CompositeAutoclosable<Stream<String>> streams 
            = new CompositeAutoclosable<Stream<String>>()) {
        for (int i = 0; i < files.length; i++) {
            final String file = files[i];
            streams.addComponent(Files.lines(Paths.get(path, file))
                .onClose(() -> System.out.println("Closed " + file)));
        }
        streams.getComponents().stream()
            .parallel()
            .flatMap(x -> x)
            .distinct()
            .sorted()
            .limit(10)
            .forEach(System.out::println);
    }
}

【讨论】:

  • 虽然我不喜欢创建自定义实用程序类,但它至少可以工作。我不喜欢它的原因是什么?当从事项目(不同部分)的其他开发人员有类似需求时,他们可能不了解实用程序类并设计自己的解决方案。
  • 这可以通过开发者之间的沟通来解决。 “昨天我遇到了这个问题,所以我写了一个实用程序......”
  • 如果你捕获了一个closeException 并且已经有一个先前的异常,你应该使用addSuppressed 而不是覆盖先前的异常...
  • 抓/扔Exception不是很糟糕吗?你不能在这里改用IOExcetion 吗?
  • @JonasCz:不幸的是,Streams 实现了AutoCloseable 而不是Closeable。虽然Stream.close 没有声明检查异常,但interface 确实如此。因此,尝试在不声明 throws Exception 的情况下编写代码将需要对 Stream 进行特殊处理,而不是通过 interface 对其进行一般处理。
【解决方案2】:

documentation of Stream.flatMap 说:

每个映射的流在其内容放入该流后关闭。

换句话说,对于流的普通关闭,不需要额外的操作。但是,由于只有处理过的流是关闭的,所以你不应该在不知道它们以后是否被流处理的情况下急切地创建流:

private static void foo(String path, String... files) throws IOException {
    Arrays.stream(files).flatMap(file-> {
              try { return Files.lines(Paths.get(path, file))
                    .onClose(() -> System.out.println("Closed " + file)); }
              catch(IOException ex) { throw new UncheckedIOException(ex); } })
          .parallel()
          .distinct()
          .sorted()
          .limit(10)
          .forEachOrdered(System.out::println);
}

通过在flatMap 中创建子流,可以保证每个子流仅在流将要处理它时创建。因此,即使在 try-with-resource 语句中没有外部 Stream,此解决方案也会关闭所有子流。

【讨论】:

  • 虽然我确信您的解决方案有效,但很难阅读并查看每个语句中发生的情况。每个开发人员都应该更喜欢易于阅读的代码而不是优化和/或聪明的代码,否则会导致维护问题或错误。
  • 您确定您的第一个解决方案会像问题中的原始代码一样并行读取文件吗?我认为parallel 应该在flatMap 之前应用。
  • @Przemyslaw Zych:parallel() 的位置无关紧要,它将整个处理变为并行,因为流管道作为一个整体是并行的或顺序的。
猜你喜欢
  • 2017-03-29
  • 2012-12-15
  • 2021-06-26
  • 1970-01-01
  • 2021-07-28
  • 2012-04-08
  • 2015-06-24
  • 1970-01-01
  • 2017-04-30
相关资源
最近更新 更多