【问题标题】:Apache Flink streaming word count aggregate over linesApache Flink 流式传输字数按行汇总
【发布时间】:2016-10-31 19:06:48
【问题描述】:

我遇到了一类在批处理中不存在的问题,但对于流式处理来说似乎并不重要。让我们考虑一下经典的字数统计示例:

lines
  .flatMap(_.split("\\W+"))
  .map(word => (word, 1))
  .keyBy(0)
  .sum(1)

这将打印流中每个单词的结果,例如:

input: "foo bar baz foo"
output: (foo, 1) (bar, 1) (baz, 1) (foo, 2)

我想做的是将每一行作为一个整体处理,然后才打印结果,即在每一行上使用一个窗口:

input: "foo bar baz foo"
output: (foo, 2) (bar, 1) (baz, 1)

显然,基于时间的窗口和基于计数的窗口都不适用于此处。解决问题的正确方法是什么?

【问题讨论】:

  • 您需要将flatMapmap 合并为一个运算符,并进行一些预聚合(即计算每个单词在句子中出现的频率)。因此,输入 foo bar baz foo" 将被转换为 "(foo, 2) (bar, 1) (baz, 1)" before keyBy
  • 我需要对单个单词进行并行操作,然后再将它们分组。
  • 我不明白你所说的“每行窗口”是什么意思。我认为在您的示例中添加更多输入行并显示所需的输出会有所帮助。
  • @FabianHueske 我的意思是我想计算每行的单词,即等到一行中的所有单词都被处理后,再发出任何输出。这只是一个示例,一般问题是:将每个输入记录拆分为不同的部分(也称为 flatMap),并行处理这些部分,然后将它们聚合(按每个初始记录分组)以进行进一步处理。
  • 即逻辑上看起来是这样的:lines.map(line => line.split("\\W+").map(processWord).reduce(aggregateWords)),这是 Mathias 建议的,但是处理功能相当繁重,所以我想以某种方式并行化它:lines.flatMap((_.id, _.split("\\W+"))).map(processWord).groupAllRecords(_.id).reduce(aggregateWords))

标签: apache-flink word-count flink-streaming


【解决方案1】:

即使在批处理模式下也无法同时处理单词和行,因为 Flink 不支持嵌套的groupBy(或keyBy)。但是,如果您想要以下批量字数的流式传输版本:

lines
  .flatMap(line => (lineId,word,1))
  .groupBy(0)
  .reduceGroup {aggregateWords}

其中aggregateWords 迭代该特定键的单词并对其进行计数,然后您可以通过以下方式实现它:对于每一行,您发出单词以及最后的特殊记录,然后使用 GlobalWindow收到特殊记录后触发的自定义触发器。

上一个批处理作业的流版本可能如下所示:

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.fromElements("foo bar baz foo", "yes no no yes", "hi hello hi hello")

            .flatMap(new FlatMapFunction<String, Tuple3<Double, String, Integer>>() {
                @Override
                public void flatMap(String s, Collector<Tuple3<Double, String, Integer>> collector) throws Exception {
                    String[] words = s.split("\\W+");
                    Double lineId = Math.random();
                    for (String w : words) {
                        collector.collect(Tuple3.of(lineId, w, 1));
                    }
                    collector.collect(Tuple3.of(lineId, "\n", 1));
                }
            })
            .keyBy(0)
            .window(GlobalWindows.create())
            .trigger(new Trigger<Tuple3<Double, String, Integer>, GlobalWindow>() {
                @Override
                public TriggerResult onElement(Tuple3<Double, String, Integer> element, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
                    if (element.f1.equals("\n")) {
                        return TriggerResult.FIRE;
                    }
                    return TriggerResult.CONTINUE;
                }

                @Override
                public TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
                    return TriggerResult.CONTINUE;
                }

                @Override
                public TriggerResult onEventTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
                    return TriggerResult.CONTINUE;
                }
            })
            .fold(new HashMap<>(), new FoldFunction<Tuple3<Double, String, Integer>, HashMap<String, Integer>>() {
                @Override
                public HashMap<String, Integer> fold(HashMap<String, Integer> hashMap, Tuple3<Double, String, Integer> tuple3) throws Exception {
                    if (!tuple3.f1.equals("\n")) {
                        hashMap.put(tuple3.f1, hashMap.getOrDefault(tuple3.f1, 0) + 1);
                    }
                    return hashMap;
                }
            }).print();

    env.execute("Test");

}

输出:

{bar=1, foo=2, baz=1}
{no=2, yes=2}
{hi=2, hello=2}

【讨论】:

  • 天啊,看起来很复杂!在批处理模式下,这个问题不存在,因为您不需要触发任何东西 - 您只需收集 groupBy 中每个键的所有现有记录,而在流式处理模式下,您需要知道何时收集到足够的信息并需要触发。
猜你喜欢
  • 2016-10-10
  • 1970-01-01
  • 2021-08-01
  • 2022-01-16
  • 2021-06-20
  • 1970-01-01
  • 1970-01-01
  • 2015-12-12
  • 1970-01-01
相关资源
最近更新 更多