【问题标题】:Conflating Java streams合并 Java 流
【发布时间】:2019-04-12 08:47:04
【问题描述】:

我有非常大的版本化文档流,按文档 ID 和版本排序。

例如Av1、Av2、Bv1、Cv1、Cv2

我必须将其转换为另一个流,其记录按文档 ID 聚合。

A[v1, v2], B[v1], C[v1, V2]

不使用Collectors.groupBy() 可以做到这一点吗?我不想使用groupBy(),因为它会在对流中的所有项目进行分组之前将它们加载到内存中。理论上,不需要将整个流加载到内存中,因为它是有序的。

【问题讨论】:

  • 您能否更清楚地说明该语句.. 因为它会在分组之前将流中的所有项目加载到内存中
  • 流管道默认是惰性的。它们按需加载数据,这对于开发低内存占用的代码很有用。因此,人们可以一次处理一个项目,而无需将所有项目都保存在内存中。但是对于像 groupBy 这样的操作,必须消耗流中的所有项目才能构建结果。有意义吗?
  • 我认为您正在寻找 StreamEx.groupRuns 的功能:amaembo.github.io/streamex/javadoc/one/util/streamex/…
  • 当您将groupingBy(Function,Collector) 与下游收集器一起使用时聚合到不引用单个元素的结果,这些元素不会保存在内存中。关键是你想对结果流做什么。显然,您假设后续操作不需要内存中保存的所有内容。然后,只需在第一个 collect 操作中执行此操作即可。
  • @millimoose 你的建议非常适合我。您想将其发布为答案吗?

标签: java java-stream collectors


【解决方案1】:

这是我想出的解决方案:

    Stream<Document> stream = Stream.of(
            new Document("A", "v1"),
            new Document("A", "v2"),
            new Document("B", "v1"),
            new Document("C", "v1"),
            new Document("C", "v2")
    );

    Iterator<Document> iterator = stream.iterator();
    Stream<GroupedDocument> result = Stream.generate(new Supplier<GroupedDocument>() {

        Document lastDoc = null;
        @Override
        public GroupedDocument get() {
            try {
                Document doc = Optional.ofNullable(lastDoc).orElseGet(iterator::next);

                String id = doc.getId();
                GroupedDocument gd = new GroupedDocument(doc.getId());
                gd.getVersions().add(doc.getVersion());

                if (!iterator.hasNext()) {
                    return null;
                }

                while (iterator.hasNext() && (doc = iterator.next()).getId().equals(id)) {
                    gd.getVersions().add(doc.getVersion());
                }
                lastDoc = doc;
                return gd;
            } catch (NoSuchElementException ex) {
                return null;
            }
        }
    });

这里是 DocumentGroupedDocument 类:

class Document {
    private String id;
    private String version;

    public Document(String id, String version) {
        this.id = id;
        this.version = version;
    }

    public String getId() {
        return id;
    }

    public String getVersion() {
        return version;
    }
}

class GroupedDocument {
    private String id;
    private List<String> versions;

    public GroupedDocument(String id) {
        this.id = id;
        versions = new ArrayList<>();
    }

    public String getId() {
        return id;
    }

    public List<String> getVersions() {
        return versions;
    }

    @Override
    public String toString() {
        return "GroupedDocument{" +
                "id='" + id + '\'' +
                ", versions=" + versions +
                '}';
    }
}

请注意,生成的流是无限流。在所有组之后,将有无限数量的nulls。您可以通过在 Java 9 中使用 takeWhile 来获取所有不为空的元素,或查看此 post

【讨论】:

  • 非常感谢您的回答。我正在使用@millimoose 提出的建议。我将我的流发送到 Spring MVC 进行编组。因此,当使用 takeWhile 之类的东西开始出现空值时,它会更加努力地停止消费流。
【解决方案2】:

您可以在StreamEx library 中使用groupRuns

class Document {
    public String id;
    public int version;
    public Document(String id, int version) {
        this.id = id;
        this.version = version;
    }
    public String toString() {
        return "Document{"+id+version+ "}";
    }
}

public class MyClass {
    private static List<Document> docs = asList(
        new Document("A", 1),
        new Document("A", 2),
        new Document("B", 1),
        new Document("C", 1),
        new Document("C", 2)
    );

    public static void main(String args[]) {
        StreamEx<List<Document>> groups = StreamEx.of(docs).groupRuns((l, r) -> l.id.equals(r.id));
        for (List<Document> grp: groups.collect(toList())) {
            out.println(grp);
        }
    }
}

哪个输出:

[文档{A1},文档{A2}]
[文档{B1}]
[文档{C1},文档{C2}]

我无法验证这不会消耗整个流,但我无法想象为什么它需要给出 groupRuns 的意图。

【讨论】:

    【解决方案3】:

    Map&lt;String, Stream&lt;String&gt;&gt; 会帮助您满足您的需求吗?

    A - v1、v2
    B - v1
    C - v1、v2

     String[] docs = { "Av1", "Av2", "Bv1", "Cv1", "Cv2"};
     Map<String, Stream<String>> map = Stream.<String>of(docs).
             map(s ->s.substring(0, 1)).distinct().                      //leave only A B C
                collect(Collectors.toMap( s1 -> s1,                      //A B C as keys
                                          s1 ->Stream.<String>of(docs).  //value is filtered stream of docs
                                            filter(s2 -> s1.substring(0, 1).
                                              equals(s2.substring(0, 1)) ).
                                                map(s3 -> s3.substring(1, s3.length())) //trim A B C
                                         ));        
    

    【讨论】:

    • 我有一个非常大的流要处理。 collect() 不会将所有内容都读入内存吗?
    猜你喜欢
    • 2011-04-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-10-20
    • 2019-03-05
    • 2015-10-27
    • 2019-04-17
    相关资源
    最近更新 更多