【问题标题】:Apache Beam framework - sort in descending orderApache Beam 框架 - 按降序排序
【发布时间】:2020-05-15 15:49:26
【问题描述】:

如何使用 Apache Beam 框架按降序排序?

我设法创建了一个字数统计管道,它按单词的字母顺序对输出进行排序,但没有弄清楚如何反转排序顺序。

代码如下:

public class SortedWordCount {

    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(options);

        BufferedExternalSorter.Options options1 = BufferedExternalSorter.options();

        p.apply(TextIO.read().from("d:/dev/playground/apache/beam/word-count-beam/src/test/resources/bible/whole_bible.txt"))
                .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) {
                            if (!word.isEmpty()) {
                                c.output(word);
                            }
                        }
                    }
                }))
                .apply(Count.perElement())
                .apply(ParDo.of(new DoFn<KV<String, Long>, KV<String, Long>>() {
                    @ProcessElement
                    public void processElement(ProcessContext c){
                        KV<String, Long> element = c.element();
                        if(element.getKey().length() > 2) {
                            c.output(element);
                        }
                    }
                }))
                .apply("CreateKey", MapElements.via(new SimpleFunction<KV<String, Long>, KV<String, KV<String, Long>>>() {
                    public KV<String, KV<String, Long>> apply(KV<String, Long> input) {
                        return KV.of("sort", KV.of(input.getKey().toLowerCase(), input.getValue()));
                    }
                }))
                .apply(GroupByKey.create())
                .apply(SortValues.create(options1))
                .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Iterable<KV<String, Long>>>, String>() {
                    @Override
                    public String apply(KV<String, Iterable<KV<String, Long>>> input) {
                        return StreamSupport.stream(input.getValue().spliterator(), false)
                                .map(value -> String.format("%20s: %s", value.getKey(), value.getValue()))
                                .collect(Collectors.joining(String.format("%n")));
                    }
                }))
                .apply(TextIO.write().to("bible"));
        // Run the pipeline.
        p.run().waitUntilFinish();
    }
}

此代码生成一个按字母顺序排序的单词列表:

           aaron: 350
       aaronites: 2
         abaddon: 1
         abagtha: 1
           abana: 1
          abarim: 4
           abase: 4
          abased: 4
         abasing: 1
          abated: 6
            abba: 3
            abda: 2
          abdeel: 1
            abdi: 3
          abdiel: 1
           abdon: 8
        abednego: 15
            abel: 16
 abelbethmaachah: 2
        abelmaim: 1

编辑 1:

经过一些调试,我知道代码使用了该类:

org.apache.beam.sdk.extensions.sorter.InMemorySorter

这个类在执行排序方法时使用了一个静态的final Comparator:

private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();

public Iterable<KV<byte[], byte[]>> sort() {
  checkState(!sortCalled, "sort() can only be called once.");

  sortCalled = true;

  Comparator<KV<byte[], byte[]>> kvComparator =
    new Comparator<KV<byte[], byte[]>>() {

      @Override
      public int compare(KV<byte[], byte[]> o1, KV<byte[], byte[]> o2) {
        return COMPARATOR.compare(o1.getKey(), o2.getKey());
      }
    };
  Collections.sort(records, kvComparator);
  return Collections.unmodifiableList(records);
}

没有办法在这个类中注入比较器。

【问题讨论】:

    标签: java sorting apache-beam


    【解决方案1】:

    我最终听从了 jkff 的建议。并使用 Apache Beam 重新编写了小的 WordCount。我也摆脱了SortValues,简单地将记录分组到一个键中,然后自己进行排序。

    这是我想出的:

    import org.apache.beam.examples.common.ExampleUtils;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.TextIO;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.*;
    import org.apache.beam.sdk.values.KV;
    
    import java.util.ArrayList;
    import java.util.function.Supplier;
    import java.util.stream.StreamSupport;
    
    public class DescendingWordCount {
    
        public static void main(String[] args) {
            PipelineOptions options = PipelineOptionsFactory.create();
            Pipeline p = Pipeline.create(options);
            p.apply(TextIO.read().from("d:/whole_bible.txt"))
                    .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
                        @ProcessElement
                        public void processElement(ProcessContext c) {
                            for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) {
                                if (word.length() > 1) {
                                    c.output(word.toLowerCase());
                                }
                            }
                        }
                    }))
                    .apply(Count.perElement())
                    .apply("CreateKey", ParDo.of(new DoFn<KV<String, Long>, KV<String, KV<String, Long>>>() {
                        @ProcessElement
                        public void processElement(ProcessContext c) {
                            KV<String, Long> element = c.element();
                            String key = element.getKey();
                            c.output(KV.of("single", KV.of(key, element.getValue())));
                        }
                    }))
                    .apply(GroupByKey.create())
                    .apply("FormatResults",
                            MapElements.via(
                                    new SimpleFunction<KV<String, Iterable<KV<String, Long>>>, String>() {
                                        @Override
                                        public String apply(KV<String, Iterable<KV<String, Long>>> input) {
                                            return StreamSupport.stream(input.getValue().spliterator(), false)
                                                    .collect((Supplier<ArrayList<KV<String, Long>>>) ArrayList::new,
                                                            (al, kv) -> al.add(KV.of(kv.getKey(), kv.getValue())),
                                                            (sb, kv) -> {
                                                            })
                                                    .stream()
                                                    .sorted((kv1, kv2) -> kv2.getKey().compareTo(kv1.getKey()))
                                                    .collect(StringBuilder::new,
                                                            (sb, kv) -> sb.append(String.format("%20s : %d%n", kv.getKey(), kv.getValue())),
                                                            (sb, kv) -> {
                                                            }).toString();
                                        }
                                    }
                            ))
                    .apply(TextIO.write().withNumShards(1).to("minimal-wordcount-bible"));
            p.run().waitUntilFinish();
        }
    }
    

    这会打印出一个输出,例如:

              zuzims : 1
         zurishaddai : 5
              zuriel : 1
                 zur : 5
                zuph : 3
                zuar : 5
           zorobabel : 3
             zorites : 1
              zoreah : 1
          zorathites : 1
               zorah : 8
              zophim : 1
              zophar : 4
              zophai : 1
              zophah : 2
              zoheth : 1
            zoheleth : 1
               zohar : 4
             zobebah : 1
               zobah : 11
                zoba : 2
                zoar : 10
                zoan : 7
               zizah : 1
                ziza : 2
                 ziz : 1
              zithri : 1
            zipporah : 3
    

    【讨论】:

      【解决方案2】:

      您可以将Iterable&lt;KV&lt;String, Long&gt;&gt; 提取到List&lt;KV&lt;String, Long&gt;&gt; 并使用Collections.reverse() 反转列表。

      【讨论】:

      • 谢谢你。这肯定是真的,但我期待 Beam API 有某种方式来指定一个比较器。
      • 我最终还是听从了你的建议,尽管我已经使用流进行排序。请参阅我对同一问题的回答。
      • 这似乎也很合理。 SortValues 仅在您希望键的值不适合内存时才真正有用:它是可扩展的,但不灵活。
      【解决方案3】:
      def sort_data(data):
        result = data.copy()
        result.sort(key=lambda item: item[0])
        return result
      
      with beam.Pipeline() as pipeline:
        intrim = pipeline | 'Data' >> beam.Create([
                ('p', 1),
                ('a', 2),
                ('p', 3),
                ('m', 2),])
        intrim = intrim | beam.Map(lambda it: (0, it)) # same key
        intrim = intrim | 'window' >> beam.WindowInto(beam.window.GlobalWindows()) # same window
        intrim = intrim | GroupByKey() # sink all to one
        intrim = intrim | beam.Map(lambda item: item[1]) # remove the dummy key
        intrim = intrim | beam.Map(sort_data) # sort the one which is all
        intrim = intrim | beam.Map(print)
      

      【讨论】:

      • 能否解释一下作者如何使用这段 Python 代码 sn-p 来解决原始问题?
      猜你喜欢
      • 1970-01-01
      • 2012-03-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-01-05
      • 1970-01-01
      相关资源
      最近更新 更多