【问题标题】:Dataflow Pipeline - Using dynamic param or query数据流管道 - 使用动态参数或查询
【发布时间】:2021-02-08 06:49:34
【问题描述】:

我正在尝试创建一个数据流管道模板,这需要我从 bigquery 中读取数据。所以我需要使用Instant.now() 使我的查询动态化,但似乎在创建模板时查询被锁定

  Some Code HERE
  Some Code HERE
  Some Code HERE
 
  pipeline.apply("ReadFromBigQuery",
      BigQueryIO.read(new DataTransformer(MyCustomObject.getQuery()))
          .fromQuery(spec.getQuery())
          .usingStandardSql()
          .withQueryLocation("US")
          .withoutValidation()
  ).apply("do Something 1",
      Combine.globally(new CombineIterableAccumulatorFn<MyCustomObject2>())
  ).apply("do Something 2",
      ParDo.of(new SendToKenshoo(param, param2)
  );

我的查询是这样的

SELECT * FROM `my-project-id.my-dataset.my-view` where PARTITIONTIME between TIMESTAMP('@currentDate') and  TIMESTAMP('@tomorrowDate')

需要使用 Instant.now() 或任何时间函数替换 @currentDate 和 @tomorrowDate 请举个例子

注意:我需要更改代码上的日期,而不是像这样在查询级别上更改日期

SELECT * FROM `my-project-id.my-dataset.my-view` where PARTITIONTIME between DATE_ADD(CURRENT_DATE(), INTERVAL -1 DAY) and  CURRENT_DATE()

【问题讨论】:

    标签: java google-bigquery google-cloud-dataflow apache-beam


    【解决方案1】:

    我不确定您如何将这些参数发送到查询(通过值提供程序等)。但是,我不建议为此使用模板,因为您需要动态输入。如果你想这样做,我会使用 Flex 模板:https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-07-22
      • 2018-08-01
      相关资源
      最近更新 更多