【问题标题】:Apache flink 1.52 Rowtime timestamp is nullApache flink 1.52 Rowtime 时间戳为空
【发布时间】:2019-02-20 22:01:57
【问题描述】:

我正在使用以下代码进行一些查询:

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    DataStream<Row> ds = SourceHelp.builder().env(env).consumer010(MyKafka.builder().build().kafkaWithWaterMark2())
            .rowTypeInfo(MyRowType.builder().build().typeInfo())
            .build().source4();
    //,proctime.proctime,rowtime.rowtime
    String sql1 = "select a,b,max(rowtime)as rowtime from user_device group by a,b";
    DataStream<Row> ds2 = TableHelp.builder().tableEnv(tableEnv).tableName("user_device").fields("a,b,rowtime.rowtime")
            .rowTypeInfo(MyRowType.builder().build().typeInfo13())
            .sql(sql1).in(ds).build().result();

    ds2.print();
    // String sql2 = "select a,count(b) as b from user_device2 group by a";
    String sql2 = "select a,count(b) as b,HOP_END(rowtime,INTERVAL '5' SECOND,INTERVAL '30' SECOND) as c from user_device2 group by HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '30' SECOND),a";
    DataStream<Row> ds3 = TableHelp.builder().tableEnv(tableEnv).tableName("user_device2").fields("a,b,rowtime.rowtime")
            .rowTypeInfo(MyRowType.builder().build().typeInfo14())
            .sql(sql2).in(ds2).build().result();

    ds3.print();
    env.execute("test");

注意:对于 sql1,我使用 max 函数和 rowtime,它不起作用,并抛出以下异常:

线程“main”中的异常 org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException:行时间时间戳为空。请做出来 确保定义了正确的 TimestampAssigner 并且流 环境使用 EventTime 时间特性。在 org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625) 在 org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) 在 com.aiicaigroup.water.WaterTest.testRowtimeWithMoreSqls5(WaterTest.java:158) 在 com.aiicaigroup.water.WaterTest.main(WaterTest.java:20) 原因: java.lang.RuntimeException:行时间时间戳为空。请做出来 确保定义了正确的 TimestampAssigner 并且流 环境使用 EventTime 时间特性。在 DataStreamSourceConversion$24.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67) 在 org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) 在 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558) 在 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533) 在 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513) 在 org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:628) 在 org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:581) 在 org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) 在 org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) 在 org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) 在 com.aiicaigroup.TableHelp$1.processElement(TableHelp.java:42) 在 com.aicaigroup.TableHelp$1.processElement(TableHelp.java:39) 在 org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) 在 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558) 在 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533) 在 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513) 在 org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) 在 org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) 在 org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) 在 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558) 在 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533) 在 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513) 在 org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) 在 org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) 在 org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) 在 org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:151) 在 org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39) 在 org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88) 在 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) 在 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) 在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) 在 java.lang.Thread.run(Thread.java:748) 2018-09-17 09:51:53.679 [Kafka 0.10 Fetcher for Source: Custom Source -> Map -> from: (a, b, rowtime) -> select: (a, b, CAST(rowtime) AS rowtime) (2/8)] 信息 Oakafka.clients.consumer .internals.AbstractCoordinator - 发现 组协调员 172.16.11.91:9092 (id: 2147483647 rack: null) 测试。

然后我尝试像这样“从 user_device 中选择 a、b、rowtime”来更新 sql1,并且它可以工作。 那么如何修复错误呢?第一个 sql 应该使用 group by,第二个 sql 应该使用 rowtime by 时间窗口。 3QS

【问题讨论】:

    标签: java apache-flink


    【解决方案1】:

    我从 1.6 开始 flink,遇到了类似的问题。 通过这些步骤解决:

    • 使用 assignTimestampsAndWatermarks ,只需使用默认和常规实现 BoundedOutOfOrdernessTimestampExtractor。您需要编写 extractTimestamp 函数来提取时间戳值并在构造函数中声明窗口间隔。
    • 在字段末尾追加 ,proctime.proctime,rowtime.rowtime(我使用 fromDataStream(Flink 1.6) 将流转换为表格)
    • 如果您想使用存在字段作为行时间。比如数据源字段是“a,clicktime,c”,可以声明“a,clicktime.rowtime,c”

    希望它能帮到你。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-10-18
      • 1970-01-01
      • 2020-07-31
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多