先跟鸡哥打个广告 ,博客地址: https://me.csdn.net/weixin_47482194
写的博客很有水平的,上了几次官网推荐了。
步入正题,在大家接触Flink SQL的时候,肯定绕不过kafka,在写入kafka的时候,不晓得大家有没有遇到问题?如下:
Exception in thread "main" org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes.
额,开什么玩笑。。最基础的 select count(*) from table 这种语句都不支持的吗????
官网的解释是:这个问题是因Flink内部Retract机制导致,在没有考虑对Chanage log全链路支持之前,无法在Kafka这样的Append only的消息队列增加对Retract/Upsert的支持。
好在table可以转变stream,这是下面的代码(我这里是分组取的topn):
如果大家嫌弃还要连接kafka麻烦的话,可以直接source生产数据替代读取kafka。
public class FlinkTopN2Doris {
private static final String KAFKA_SQL = "CREATE TABLE kafka_table (" +
" category_id STRING," +
" user_id STRING ," +
" item_id STRING ," +
" behavior STRING ," +
" ts STRING ," +
// " proctime as PROCTIME() ," +
" row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(cast(ts AS BIGINT), 'yyyy-MM-dd HH:mm:ss'))," +
" WATERMARK FOR row_ts AS row_ts - INTERVAL '5' SECOND " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flink_test'," +
" 'properties.bootstrap.servers' = '192.168.12.188:9092'," +
" 'properties.group.id' = 'test1'," +
" 'format' = 'json'," +
" 'scan.startup.mode' = 'earliest-offset'" +
")";
private static final String SINK_KAFKA_SQL = "CREATE TABLE kafka_table2 (" +
" ts STRING," +
" user_id STRING ," +
" behavior STRING ," +
"row_num BIGINT " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flink_test2'," +
" 'properties.bootstrap.servers' = '192.168.12.188:9092'," +
" 'properties.group.id' = 'test1'," +
" 'format' = 'json'," +
" 'scan.startup.mode' = 'earliest-offset'" +
")";
private static final String PRINT_SQL = "create table sink_print (" +
" p_count BIGINT ," +
" b STRING " +
") with ('connector' = 'print' )";
private static final String PRINT_SQL2 = "create table sink_print2 (" +
" a STRING," +
" b STRING," +
" c STRING," +
" d BIGINT " +
") with ('connector' = 'print' )";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
bsEnv.enableCheckpointing(5000);
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
//todo 从造的数据通过Flink读取出来做了操作之后写入kafka。
tableEnv.executeSql(KAFKA_SQL);
tableEnv.executeSql(PRINT_SQL);
tableEnv.executeSql(PRINT_SQL2);
tableEnv.executeSql(SINK_KAFKA_SQL);
// tableEnv.executeSql("select * from kafka_table").print();
//todo 按behavior分组 然后求用户总数
// tableEnv.executeSql("insert into sink_print select COUNT(DISTINCT user_id) AS uv, behavior from kafka_table GROUP BY behavior ");
//todo 显示topN,显示最新的一条数据
/*String top1Sql = "insert into sink_print2 SELECT * " +
"FROM (" +
" SELECT user_id,behavior," +
" ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY ts DESC) as row_num" +
" FROM kafka_table ) " +
"WHERE row_num = 1";
tableEnv.executeSql(top1Sql);*/
String top1Sql = "insert into sink_print2 SELECT * " +
"FROM (" +
" SELECT ts,user_id,behavior," +
" ROW_NUMBER() OVER (PARTITION BY behavior ORDER BY ts DESC) as row_num" +
" FROM kafka_table ) " +
"WHERE row_num = 1";
// tableEnv.executeSql(top1Sql);
// TODO: 2020/7/31 输出到kafka
String sinkToKafka = "SELECT * " +
"FROM (" +
" SELECT ts,user_id,behavior," +
" ROW_NUMBER() OVER (PARTITION BY behavior ORDER BY ts DESC) as row_num" +
" FROM kafka_table ) " +
"WHERE row_num = 1";
// tableEnv.executeSql(sinkToKafka);
Table table = tableEnv.sqlQuery(sinkToKafka);
// DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnv.toRetractStream(table, Row.class);
DataStream<Tuple2<Boolean, Cookies>> tuple3DataStream = tableEnv.toRetractStream(table, Cookies.class);
DataStream<String> sinkKafka = tuple3DataStream.flatMap(new FlatMapFunction<Tuple2<Boolean, Cookies>, String>() {
@Override
public void flatMap(Tuple2<Boolean, Cookies> value, Collector<String> out) throws Exception {
// System.out.println("value.f0 = " + value.f0);
//todo 注意。这里好像没有了‘false’ 状态 只剩下了true
String outStr = JSONObject.toJSONString(value.f1);
out.collect(outStr);
}
});
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(
"flink_doris",
new SimpleStringSchema(),
getProperties()); // 序列化 schema
myProducer.setWriteTimestampToKafka(true);
sinkKafka.addSink(myProducer).name("ods").uid("ods").setParallelism(1);
//todo 保留最后一条最新的数据
/* tuple2DataStream.flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, JSONObject>() {
@Override
public void flatMap(Tuple2<Boolean, Row> value, Collector<JSONObject> out) throws Exception {
Boolean lastValue = value.f0;
Row row = value.f1;
JSONObject json = new JSONObject();
json.put("state",lastValue);
json.put("ts",row.getField(0));
json.put("user_id",row.getField(1));
json.put("behavior",row.getField(2));
json.put("row_num",row.getField(3));
out.collect(json);
}
}).print();*/
// insert into kafka_table2
bsEnv.execute("执行任务中......................");
}
public static Properties getProperties() {
Properties producerConfig = new Properties();
producerConfig.setProperty("bootstrap.servers", "192.168.12.188:9092");
producerConfig.setProperty("ack", "all");
producerConfig.setProperty("buffer.memory", "102400");
producerConfig.setProperty("compression.type", "snappy");
producerConfig.setProperty("batch.size", "1000");
producerConfig.setProperty("linger.ms", "1");
producerConfig.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "");
return producerConfig;
}
/**
* todo 最后的结果
* +I(1535452032,130701,pv,1)
* +I(1512316772,106260,fav,1)
* +I(1512316741,1015357,buy,1)
* +I(1512316781,1014597,cart,1)
*
*/
}
自定义source生产数据:
然后转成table,注册为表
SingleOutputStreamOperator<Row> ds = env.addSource(new RichSourceFunction<Row>() {
@Override
public void run(SourceContext<Row> ctx) throws Exception {
Row r = new Row(2);
r.setField(0, "a");
r.setField(1, "a");
ctx.collect(r);
}
@Override
public void cancel() {
}
}).returns(Types.ROW(Types.STRING, Types.STRING));
blinkStreamTableEnv.createTemporaryView("t",ds,"id,order_key,proctime.proctime");
其实还有SQL可以直接代码输出到kafka的,我们参考官网给的案例,大佬文章地址:
https://mp.weixin.qq.com/s/MSs7HSaegyWWU3Fig2PYYA
1,先创建2个类,在你的项目创建 org.apache.flink.streaming.connectors.kafka包 并把上面的两个类放入该包,用于覆盖官方KafkaConnector里面的实现。
-
KafkaTableSinkBase.java
https://github.com/sunjincheng121/know_how_know_why/blob/master/QA/upsertKafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
-
KafkaTableSourceSinkFactoryBase.java
https://github.com/sunjincheng121/know_how_know_why/blob/master/QA/upsertKafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
2,这个案例是1.10的,我本地是1.11.0,所以需要注释代码中的报错代码:
OK,测试我们的topn代码:
import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** * @program: flink-tech * @description: Flink的topN写入到kafka,全部使用sql语句 * @author: Mr.Wang * @create: 2020-07-31 14:46 **/ public class FlinkTopNBySql { private static final String KAFKA_SQL = "CREATE TABLE kafka_table (" + " category_id STRING," + " user_id STRING ," + " item_id STRING ," + " behavior STRING ," + " ts STRING ," + // " proctime as PROCTIME() ," + " row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(cast(ts AS BIGINT), 'yyyy-MM-dd HH:mm:ss'))," + " WATERMARK FOR row_ts AS row_ts - INTERVAL '5' SECOND " + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'flink_test'," + " 'properties.bootstrap.servers' = '192.168.12.188:9092'," + " 'properties.group.id' = 'test1'," + " 'format' = 'json'," + " 'scan.startup.mode' = 'earliest-offset'" + ")"; private static final String PRINT_SQL = "create table sink_print (" + " b BIGINT " + ") with ('connector' = 'print' )"; public static void main(String[] args) throws Exception { StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); bsEnv.enableCheckpointing(5000); bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); //todo 从造的数据通过Flink读取出来做了操作之后写入kafka。 tableEnv.executeSql(KAFKA_SQL); tableEnv.executeSql(PRINT_SQL); String top1Sql = "insert into sink_print SELECT count(category_id) from kafka_table GROUP BY behavior" ; tableEnv.executeSql(top1Sql); } }
控制台打印结果: