1,先来的简单点的,折腾了半天 昨天是maven下载不了flink-clients.jar ,下载之后手动导入,然后最简单的代码都运行不了
今天没办法,还是报错,缺包(org.apache.flink.optimizer.costs.CostEstimator),没法子,缺的包就是flink-clients.jar里面的,
再次尝试加入依赖,今天是成功了,代码就执行成功了。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
public class SqlTest03 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings);
// ingest a DataStream from an external source
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(new SourceFunction<Tuple3<Long, String, Integer>>() {
@Override
public void run(SourceContext<Tuple3<Long, String, Integer>> out) throws Exception {
while (true){
out.collect(new Tuple3<>(1L,"a",11));
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
}
});
Table table = tableEnv.fromDataStream(ds, $("user"), $("product"), $("amount"));
DataStream<Tuple2<Boolean, Row>> dsRow = tableEnv.toRetractStream(table, Row.class);
dsRow.print();
env.execute();
}
}
顺便提一句关于 最后面的执行: