【发布时间】:2018-04-17 08:22:54
【问题描述】:
我正在尝试编写一个流式传输作业,它将数据流下沉到 postgres 表中。为了提供完整的信息,我的工作基于文章:https://tech.signavio.com/2017/postgres-flink-sink,它建议使用 JDBCOutputFormat。
我的代码如下所示:
98 ...
99 String strQuery = "INSERT INTO public.alarm (entity, duration, first, type, windowsize) VALUES (?, ?, ?, 'dur', 6)";
100
101 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
102 .setDrivername("org.postgresql.Driver")
103 .setDBUrl("jdbc:postgresql://localhost:5432/postgres?user=michel&password=polnareff")
104 .setQuery(strQuery)
105 .setSqlTypes(new int[] { Types.VARCHAR, Types.INTEGER, Types.VARCHAR}) //set the types
106 .finish();
107
108 DataStream<Row> rows = FilterStream
109 .map((tuple)-> {
110 Row row = new Row(3); // our prepared statement has 3 parameters
111 row.setField(0, tuple.f0); // first parameter is case ID
112 row.setField(1, tuple.f1); // second paramater is tracehash
113 row.setField(2, f.format(tuple.f2)); // third paramater is tracehash
114 return row;
115 });
116
117 rows.writeUsingOutputFormat(jdbcOutput);
118
119 env.execute();
120
121 }
122 }
我现在的问题是只有在我的工作停止时才插入值(准确地说,当我从 apache flink 仪表板取消我的工作时)。
所以我的问题如下:我错过了什么吗?我应该在某处提交我插入的行吗?
最好的问候, 伊格内修斯
【问题讨论】:
-
JDBCOutputFormat 批量写入值;默认大小为 5000。您可以通过调用 setBatchInterval() 在 buildJDBCOutputFormat 块中控制此参数。如果您的作业的输入小于间隔,则仅在接收器关闭时提交批处理,即作业终止时。
-
您好,您的评论实际上是我问题的答案。我在第 106 行添加了“.setBatchInterval(1)”,它非常有效。非常感谢
标签: apache-flink flink-streaming