【问题标题】:Write into cloud sql using dataflow JdbcIO api使用数据流 JdbcIO api 写入云 sql
【发布时间】:2018-12-13 01:49:01
【问题描述】:

我有一个要求,我必须使用 Cloud Dataflow API 将字符串的 PCollection 写入 Cloud SQL。

pipeline.apply(TextIO.read().from("gs://***/sampleBigtable.csv"))
    .apply(JdbcIO.write()
    .withDataSourceConfiguration(DataSourceConfiguration
    .create("org.postgresql.Driver", "jdbc:postgresql://***:5432/test")
    .withUsername("**").withPassword("password10"))
    .withStatement("insert into person values(?,?)")
    .withPreparedStatementSetter(
 new JdbcIO.PreparedStatementSetter < Object > () {
  /**
   * 
   */
  private static final long serialVersionUID = 1 L;

  @Override
  public void setParameters(Object arg0, PreparedStatement query)
  throws Exception {
   // TODO Auto-generated method stub
   query.setString(1, "Hello");
   query.setString(1, "Hi");
  }
 }));

这是我正在尝试的示例代码。我想做的一个非常简单的版本。

另外,是否可以使用 parDo 从 Dataflow 写入 Cloud SQL 并编写简单的插入语句?

【问题讨论】:

    标签: google-cloud-platform google-cloud-dataflow google-cloud-sql


    【解决方案1】:

    之前的转换输出PCollection&lt;String&gt;,因此您需要指定JdbcIO&lt;T&gt;.write()的输入类型

    类似这样的:

        pipeline
            .apply(TextIO.read().from("gs://***/sampleBigtable.csv"))
            .apply(JdbcIO.<String>write().withDataSourceConfiguration(
                DataSourceConfiguration.create("org.postgresql.Driver","jdbc:postgresql://***:5432/test")
                    .withUsername("**")
                    .withPassword("password10"))
                    .withStatement("insert into person values(?,?)")
                        .withPreparedStatementSetter((element, query) -> {
                            query.setInt(1, 1);
                            query.setString(2, "Hello");
                        })
            );
    

    【讨论】:

    • 知道了。效果很好。我的错。谢谢。
    猜你喜欢
    • 2019-10-22
    • 2016-11-18
    • 1970-01-01
    • 2015-08-22
    • 2020-01-25
    • 2020-03-04
    • 2022-12-12
    • 1970-01-01
    • 2017-10-10
    相关资源
    最近更新 更多