【问题标题】:How to make select field from select * Flink如何从 select * Flink 中选择字段
【发布时间】:2018-06-23 21:59:56
【问题描述】:

我正在尝试加入 2 个选择。

我必须在代码中进行查询,看起来像这个查询

select *
from  Data
where numPers > 10 && Object = P1

还有这个

select *
from  Data
where numPers < 20 && Object == P1

我只需要数据中的时间戳而不重复

我使用的程序代码如下所示

object Prog {

  def main(args: Array[String]) : Unit = {
    org.apache.log4j.BasicConfigurator.configure()

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val csvTableSource = CsvTableSource
      .builder
      .path("src/main/resources/data.stream")
      .field("numPers", Types.INT)
      .field("Object", Types.STRING)
      .field("TIMESTAMP", Types.STRING)
      .fieldDelimiter(",")
      .ignoreFirstLine
      .ignoreParseErrors
      .commentPrefix("%")
      .build()

    tableEnv.registerTableSource("Data", csvTableSource)

    val table = tableEnv.scan("Data") //this works
      .filter("numPers > 10")
      .select("*")


    val ds = tableEnv.toAppendStream(table, classOf[Row])

    ds.print()
    env.execute()
  }
}

但是如何将第二个查询添加到第一个查询中?

【问题讨论】:

  • 我不明白你的问题。你想加入哪个表?只有一张表 (Data),因此没有可加入的内容。
  • 我很抱歉问题不准确,我需要从 numPers > 10 && Object = P1 的数据中选择 * 并从 numPers

标签: select filter stream apache-flink sliding-window


【解决方案1】:

如果我正确理解您的要求,您不需要加入,只需一个 BETWEEN 谓词:

val query = "SELECT * FROM Data WHERE numPers BETWEEN 10 AND 20 AND Object = P1"
val table = tableEnv.sqlQuery(query)

【讨论】:

  • 是的,在这种情况下它可以工作。我还有另一个无法实现的数据,我想使用这种方法进行查询。有一个带有字符串值的字段。如果想选择例如如何打印满足下一个条件的所有时间戳 where field = value1 && Object = P1 INNER JOIN BY TIMESTAMP where field = value2 && Object = P1
猜你喜欢
  • 1970-01-01
  • 2017-04-06
  • 1970-01-01
  • 2010-12-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-07-19
相关资源
最近更新 更多