【问题标题】:Optimized Top-N query using Flink SQL使用 Flink SQL 优化 Top-N 查询
【发布时间】:2020-01-07 14:52:27
【问题描述】:

我正在尝试使用 Flink SQL 运行流式 top-n 查询,但无法使“优化版本”outlined in the Flink docs 正常工作。设置如下:

我有一个 Kafka 主题,其中每条记录都包含一个元组(GUID、达到分数、最大可能分数)。把他们想象成一个学生参加评估,元组代表他取得了多少分。

我想要得到的是五个 GUID 的列表,其中最高分数以百分比衡量(即按 SUM(reached_score) / SUM(maximum possible score) 排序)。

我首先汇总分数并按 GUID 分组:

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);

Table scores = tableEnv.fromDataStream(/* stream from kafka */, "guid, reached_score, max_score");
tableEnv.registerTable("scores", scores);

Table aggregatedScores = tableEnv.sqlQuery(
        "SELECT " +
        "  guid, " +
        "  SUM(reached_score) as reached_score, " +
        "  SUM(max_score) as max_score, " +
        "  SUM(reached_score) / CAST(SUM(max_score) AS DOUBLE) as score " +
        "FROM scores " +
        "GROUP BY guid");

tableEnv.registerTable("agg_scores", aggregatedScores);

结果表包含一个未排序的汇总分数列表。然后我尝试将它输入到 Flink 文档中使用的 Top-N 查询中:

Table topN = tableEnv.sqlQuery(
        "SELECT guid, reached_score, max_score, score, row_num " +
        "FROM (" +
        "   SELECT *," +
        "       ROW_NUMBER() OVER (ORDER BY score DESC) as row_num" +
        "   FROM agg_scores)" +
        "WHERE row_num <= 5");


tableEnv.toRetractStream(topN, Row.class).print();

运行此查询会按预期运行,如果元素的顺序发生变化,则会导致多次更新。

// add first entry
6> (true,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,1)

// add a second entry with lower score below the first one
7> (true,d7847f58-a4d9-40f8-a38d-161821b48481,67,200,0.335,2)

// update the second entry with a much higher score
8> (false,d7847f58-a4d9-40f8-a38d-161821b48481,67,200,0.335,2)
1> (true,d7847f58-a4d9-40f8-a38d-161821b48481,229,400,0.5725,1)
3> (true,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,2)
2> (false,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,1)

然后我按照文档中的建议从投影中删除了 row_number:

Table topN = tableEnv.sqlQuery(
    "SELECT guid, reached_score, max_score, score " +
    "FROM (" +
    "   SELECT *," +
    "       ROW_NUMBER() OVER (ORDER BY score DESC) as row_num" +
    "   FROM agg_scores)" +
    "WHERE row_num <= 5");

运行类似的数据集:

// add first entry
4> (true,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56)

// add a second entry with lower score below the first one
5> (true,d7847f58-a4d9-40f8-a38d-161821b48481,76,200,0.38)

// update the second entry with a much higher score
7> (true,d7847f58-a4d9-40f8-a38d-161821b48481,354,400,0.885)
1> (true,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56) <-- ???
8> (false,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56) <-- ???
6> (false,d7847f58-a4d9-40f8-a38d-161821b48481,76,200,0.38)

我不明白的是:

  • 为什么第一个条目 (63992935-9684-4285-8c2b-1fd57b51b48f) 被删除并再次添加/仍然被触及
  • 为什么第二个条目首先被添加(第二次)然后被删除。这不会导致从技术上将其从数据流中删除吗?

两者显然都与排序变化的顺序有关,但这不就是优化的top-n查询(written further down in the documentation)应该解决的问题吗?

【问题讨论】:

    标签: apache-flink flink-streaming flink-sql


    【解决方案1】:

    我已经检查过这个问题,也可以在我的本地环境中重现。我也做了一些调查,原因是:

    “我们没有针对某些场景做这样的优化,你的案例似乎就是其中之一”。

    但是,根据用户文档,我认为在您的场景中包含此类优化也是有效的请求。对我来说这看起来像是一个 BUG,我们声称进行了一些优化,但没有成功。

    我创建了一个问题:https://issues.apache.org/jira/browse/FLINK-15497 来跟踪这个问题,希望我们可以在即将发布的 1.9.2 和 1.10.0 版本中修复它。

    感谢您报告此事。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-11-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多