【问题标题】:How to implement Window Function in Apache Flink?如何在 Apache Flink 中实现窗口函数?
【发布时间】:2020-08-30 05:04:49
【问题描述】:

大家, 我有一个 kafka 主题源,我将它按 1 分钟窗口分组。 我想在那个窗口中做的是用 SQL 中的窗口函数创建新列,例如我想使用

  • SUM(amount) OVER(PARTITION BY
  • COUNT(user) OVER(PARTITION BY
  • ROW_NUMBER() OVER(PARTITION BY

我可以对这些操作使用 DataStream 函数吗?

如何操作我的 kafka 数据将其转换为 DataTable 并使用 sqlQuery?

Destination 是另一个 kafka 主题。

    val stream = senv
      .addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties))

我已经尝试过这样做

val tableA = tableEnv.fromDataStream(stream, 'user, 'product, 'amount)

但我收到以下错误

Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type.

测试数据

1,"beer",3
1,"beer",1
2,"beer",3
3,"diaper",4
4,"diaper",1
5,"diaper",5
6,"rubber",2

查询示例

    SELECT
     user, product, amount,
     COUNT(user) OVER(PARTITION BY product) AS count_product
   FROM table;

预期表现

1,"beer",3,3
1,"beer",1,3
2,"beer",3,3
3,"diaper",4,3
4,"diaper",1,3
5,"diaper",5,3
6,"rubber",2,1

【问题讨论】:

  • 请发布一份来自 Kafka 的数据样本。
  • 同时贴出你要实现的SQL查询。
  • 您的流是字符串类型,您正在尝试从中访问字段。这解释了你的例外。 Too many fields referenced from an atomic type
  • 你好,查询是:SELECT user, product, amount, COUNT(user) OVER(PARTITION BY product) AS count_product FROM table;
  • 如何应用该方案以便将其转换为表格?

标签: scala apache-kafka streaming apache-flink


【解决方案1】:

您需要将字符串解析为字段,然后重命名它们。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

val stream = env.fromElements("1,beer,3",
 "1,beer,1","2,beer,3","3,diaper,4","4,diaper,1","5,diaper,5","6,rubber,2");

val parsed = stream.map(x=> {
 val arr = x.split(",")
 (arr(0).toInt, arr(1), arr(2).toInt)
})

val tableA = tEnv.fromDataStream(parsed, $"_1" as "user", $"_2" as "product", $"_3" as "amount")

// example query
val result = tEnv.sqlQuery(s"SELECT user, product, amount from $tableA")

val rs = result.toAppendStream[(Int, String, Int)]

rs.print()

我不确定我们如何在 Flink SQL 中实现所需的窗口函数。或者,也可以在简单的 Flink 中实现如下:

parsed.keyBy(x => x._2) // key by product id.
      .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
      .process(new ProcessWindowFunction[
        (Int, String, Int), (Int, String, Int, Int), String, TimeWindow
      ]() {
        override def process(key: String, context: Context,
                             elements: Iterable[(Int, String, Int)],
                             out: Collector[(Int, String, Int, Int)]): Unit = {
          val lst = elements.toList
          lst.foreach(x => out.collect((x._1, x._2, x._3, lst.size)))
        }
      })
      .print()

【讨论】:

  • 谢谢!我使用了 fromDataStream(parsed, 'user, 'product, 'amount) 的语法
  • flink_streaming_scala_2.11 only,包含:StreamExecutionEnvironment 类的库。
  • "override def process()" 让我误会错误,告诉我方法 'process' 什么都不覆盖
  • 删除process 方法后会发生什么?
  • 出了什么问题?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-02-26
  • 2020-08-31
  • 1970-01-01
相关资源
最近更新 更多