【发布时间】:2020-01-07 14:52:27
【问题描述】:
我正在尝试使用 Flink SQL 运行流式 top-n 查询,但无法使“优化版本”outlined in the Flink docs 正常工作。设置如下:
我有一个 Kafka 主题,其中每条记录都包含一个元组(GUID、达到分数、最大可能分数)。把他们想象成一个学生参加评估,元组代表他取得了多少分。
我想要得到的是五个 GUID 的列表,其中最高分数以百分比衡量(即按 SUM(reached_score) / SUM(maximum possible score) 排序)。
我首先汇总分数并按 GUID 分组:
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
Table scores = tableEnv.fromDataStream(/* stream from kafka */, "guid, reached_score, max_score");
tableEnv.registerTable("scores", scores);
Table aggregatedScores = tableEnv.sqlQuery(
"SELECT " +
" guid, " +
" SUM(reached_score) as reached_score, " +
" SUM(max_score) as max_score, " +
" SUM(reached_score) / CAST(SUM(max_score) AS DOUBLE) as score " +
"FROM scores " +
"GROUP BY guid");
tableEnv.registerTable("agg_scores", aggregatedScores);
结果表包含一个未排序的汇总分数列表。然后我尝试将它输入到 Flink 文档中使用的 Top-N 查询中:
Table topN = tableEnv.sqlQuery(
"SELECT guid, reached_score, max_score, score, row_num " +
"FROM (" +
" SELECT *," +
" ROW_NUMBER() OVER (ORDER BY score DESC) as row_num" +
" FROM agg_scores)" +
"WHERE row_num <= 5");
tableEnv.toRetractStream(topN, Row.class).print();
运行此查询会按预期运行,如果元素的顺序发生变化,则会导致多次更新。
// add first entry
6> (true,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,1)
// add a second entry with lower score below the first one
7> (true,d7847f58-a4d9-40f8-a38d-161821b48481,67,200,0.335,2)
// update the second entry with a much higher score
8> (false,d7847f58-a4d9-40f8-a38d-161821b48481,67,200,0.335,2)
1> (true,d7847f58-a4d9-40f8-a38d-161821b48481,229,400,0.5725,1)
3> (true,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,2)
2> (false,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,1)
然后我按照文档中的建议从投影中删除了 row_number:
Table topN = tableEnv.sqlQuery(
"SELECT guid, reached_score, max_score, score " +
"FROM (" +
" SELECT *," +
" ROW_NUMBER() OVER (ORDER BY score DESC) as row_num" +
" FROM agg_scores)" +
"WHERE row_num <= 5");
运行类似的数据集:
// add first entry
4> (true,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56)
// add a second entry with lower score below the first one
5> (true,d7847f58-a4d9-40f8-a38d-161821b48481,76,200,0.38)
// update the second entry with a much higher score
7> (true,d7847f58-a4d9-40f8-a38d-161821b48481,354,400,0.885)
1> (true,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56) <-- ???
8> (false,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56) <-- ???
6> (false,d7847f58-a4d9-40f8-a38d-161821b48481,76,200,0.38)
我不明白的是:
- 为什么第一个条目 (
63992935-9684-4285-8c2b-1fd57b51b48f) 被删除并再次添加/仍然被触及 - 为什么第二个条目首先被添加(第二次)然后被删除。这不会导致从技术上将其从数据流中删除吗?
两者显然都与排序变化的顺序有关,但这不就是优化的top-n查询(written further down in the documentation)应该解决的问题吗?
【问题讨论】:
标签: apache-flink flink-streaming flink-sql