【问题标题】:Apache Flink: Write a DataStream to a Postgres tableApache Flink:将 DataStream 写入 Postgres 表
【发布时间】:2018-04-17 08:22:54
【问题描述】:

我正在尝试编写一个流式传输作业,它将数据流下沉到 postgres 表中。为了提供完整的信息,我的工作基于文章:https://tech.signavio.com/2017/postgres-flink-sink,它建议使用 JDBCOutputFormat。

我的代码如下所示:

98     ... 
99     String strQuery = "INSERT INTO public.alarm (entity, duration, first, type, windowsize) VALUES (?, ?, ?, 'dur', 6)";
100
101     JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
102      .setDrivername("org.postgresql.Driver")
103      .setDBUrl("jdbc:postgresql://localhost:5432/postgres?user=michel&password=polnareff")
104      .setQuery(strQuery)
105      .setSqlTypes(new int[] { Types.VARCHAR, Types.INTEGER, Types.VARCHAR}) //set the types
106      .finish();
107
108     DataStream<Row> rows = FilterStream
109                 .map((tuple)-> {
110                    Row row = new Row(3);                  // our prepared statement has 3 parameters
111                    row.setField(0, tuple.f0);             // first parameter is case ID
112                    row.setField(1, tuple.f1);             // second paramater is tracehash
113                    row.setField(2, f.format(tuple.f2));   // third paramater is tracehash
114                    return row;
115                 });
116
117     rows.writeUsingOutputFormat(jdbcOutput);
118
119     env.execute();
120
121     }
122 }

我现在的问题是只有在我的工作停止时才插入值(准确地说,当我从 apache flink 仪表板取消我的工作时)。

所以我的问题如下:我错过了什么吗?我应该在某处提交我插入的行吗?

最好的问候, 伊格内修斯

【问题讨论】:

  • JDBCOutputFormat 批量写入值;默认大小为 5000。您可以通过调用 setBatchInterval() 在 buildJDBCOutputFormat 块中控制此参数。如果您的作业的输入小于间隔,则仅在接收器关闭时提交批处理,即作业终止时。
  • 您好,您的评论实际上是我问题的答案。我在第 106 行添加了“.setBatchInterval(1)”,它非常有效。非常感谢

标签: apache-flink flink-streaming


【解决方案1】:

正如 Chesnay 在 his comment 中所说,您必须调整批处理间隔。

然而,这还不是全部。如果你想获得至少一次的结果,你必须将批量写入与 Flink 的检查点同步。基本上,您必须将JdbcOutputFormat 包装在同样实现CheckpointedFunction 接口的SinkFunction 中。当调用snapshotState() 时,您已将批处理写入数据库。您可以查看此pull request,它将在下一个版本中提供此功能。

【讨论】:

    【解决方案2】:

    Fabian 的回答是实现至少一次语义的一种方法;通过将写入与 Flink 的检查点同步。但是,这样做的缺点是您的 Sink 的数据新鲜度现在与您的检查点间隔周期紧密相关。

    作为替代方案,您可以将具有(实体、持续时间、第一个)字段的元组或行存储在 Flink 自己的托管状态中,以便 Flink 负责检查点它(换句话说,使您的 Sink 的状态容错)。为此,您需要实现 CheckpointedFunction 和 CheckpointedRestoring 接口(无需将写入与检查点同步。如果您不必使用 JDBCOutputFormat,您甚至可以单独执行 SQL 插入)。请参阅:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state。另一种解决方案是仅实现 ListCheckpointed 接口(可以与已弃用的 CheckpointedRestoring 接口类似的方式使用,并支持列表式状态重新分配)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-09-20
      • 2020-02-11
      • 1970-01-01
      • 2021-10-30
      • 1970-01-01
      • 1970-01-01
      • 2021-07-14
      • 2018-09-12
      相关资源
      最近更新 更多