【问题标题】:How do I write a google cloud dataflow transform mapping?如何编写谷歌云数据流转换映射?
【发布时间】:2018-04-06 21:15:36
【问题描述】:

我正在将谷歌云数据流作业从 dataflow java sdk 1.8 升级到 2.4 版,然后尝试使用 --update 和 --transformNameMapping 参数更新其在谷歌云上的现有数据流作业,但我不知道如何正确编写 transformNameMappings 以使升级成功并通过兼容性检查。

我的代码在兼容性检查中失败并出现以下错误: Workflow failed. Causes: The new job is not compatible with 2018-04-06_13_48_04-12999941762965935736. The original job has not been aborted., The new job is missing steps BigQueryIO.writeTableRows/BigQueryIO.StreamWithDeDup/Reshuffle/GroupByKey, PubsubIO.readStrings. If these steps have been renamed or deleted, please specify them with the update command.

现有、当前正在运行的作业的数据流转换名称是:

  1. PubsubIO.Read

  2. ParDo(ExtractJsonPath) - 我们编写的自定义函数

  3. ParDo(AddMetadata) - 我们编写的另一个自定义函数

  4. BigQueryIO.Write

在我使用 2.4 sdk 的新代码中,我更改了第 1 次和第 4 次转换/函数,因为一些库被重命名并且在新版本中弃用了一些旧 sdk 的函数。

具体的变换代码可以看下面:

1.8 SDK 版本:

     PCollection<String> streamData =
       pipeline
        .apply(PubsubIO.Read
                .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
                 //.subscription(options.getPubsubSubscription())
                .topic(options.getPubsubTopic()));
     streamData
         .apply(ParDo.of(new ExtractJsonPathFn(pathInfos)))
         .apply(ParDo.of(new AddMetadataFn()))
        .apply(BigQueryIO.Write
                 .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                 .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                 .to(tableRef)

我重写的2.4 SDK版本:

     PCollection<String> streamData =
       pipeline
        .apply("PubsubIO.readStrings", PubsubIO.readStrings()
                .withTimestampAttribute(PUBSUB_TIMESTAMP_LABEL_KEY)
                 //.subscription(options.getPubsubSubscription())
                .fromTopic(options.getPubsubTopic()));

     streamData
         .apply(ParDo.of(new ExtractJsonPathFn(pathInfos)))
         .apply(ParDo.of(new AddMetadataFn()))
        .apply("BigQueryIO.writeTableRows", BigQueryIO.writeTableRows()
                 .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                 .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                 .to(tableRef)

所以在我看来 PubsubIO.Read 应该映射到 PubsubIO.readStringsBigQueryIO.Write 应该映射到 BigQueryIO.writeTableRows。但我可能会误解这是如何工作的。

我一直在尝试各种各样的事情 - 我试图给出我未能重新映射已定义名称的这两个转换,因为它们以前没有明确命名,所以我将我的应用更新为 .apply("PubsubIO.readStrings" 和 @987654329 @ 然后将我的 transformNameMapping 参数设置为:

--transformNameMapping={\"BigQueryIO.Write\":\"BigQueryIO.writeTableRows\",\"PubsubIO.Read\":\"PubsubIO.readStrings\"}

 --transformNameMapping={\"BigQueryIO.Write/BigQueryIO.StreamWithDeDup/Reshuffle/GroupByKey\":\"BigQueryIO.writeTableRows/BigQueryIO.StreamWithDeDup/Reshuffle/GroupByKey\",\"PubsubIO.Read\":\"PubsubIO.readStrings\"}

甚至尝试重新映射复合变换中的所有内部变换

--transformNameMapping={\"BigQueryIO.Write/BigQueryIO.StreamWithDeDup/Reshuffle/GroupByKey\":\"BigQueryIO.writeTableRows/BigQueryIO.StreamWithDeDup/Reshuffle/GroupByKey\",\"BigQueryIO.Write/BigQueryIO.StreamWithDeDup/Reshuffle\":\"BigQueryIO.writeTableRows/BigQueryIO.StreamWithDeDup/Reshuffle\",\"BigQueryIO.Write/BigQueryIO.StreamWithDeDup\":\"BigQueryIO.writeTableRows/BigQueryIO.StreamWithDeDup\",\"BigQueryIO.Write\":\"BigQueryIO.writeTableRows\",\"PubsubIO.Read\":\"PubsubIO.readStrings\"}

但无论如何我似乎都会遇到同样的错误:

The new job is missing steps BigQueryIO.writeTableRows/BigQueryIO.StreamWithDeDup/Reshuffle/GroupByKey, PubsubIO.readStrings.

想知道我是否做错了什么?在谁愿意分享他们使用的格式之前写过转换映射的任何人?除了关于更新数据流作业的主要谷歌文档之外,我根本找不到任何在线示例,除了最简单的案例--transformNameMapping={"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...} 之外并没有真正涵盖任何内容,并且没有使示例非常具体。

【问题讨论】:

    标签: google-cloud-platform google-cloud-dataflow


    【解决方案1】:

    事实证明,谷歌云 Web 控制台数据流作业详细信息页面的日志中有其他信息,但我错过了这些信息。我需要将日志级别从info 调整为显示any log level,然后我发现了几个步骤融合消息,例如(尽管还有更多):

     2018-04-16 (13:56:28) Mapping original step BigQueryIO.Write/BigQueryIO.StreamWithDeDup/Reshuffle/GroupByKey to write/StreamingInserts/StreamingWriteTables/Reshuffle/GroupByKey in the new graph.
     2018-04-16 (13:56:28) Mapping original step PubsubIO.Read to PubsubIO.Read/PubsubUnboundedSource in the new graph.
    

    我没有尝试将PubsubIO.Read 映射到PubsubIO.readStrings,而是需要映射到我在附加日志中提到的步骤。在这种情况下,我通过将PubsubIO.Read 映射到PubsubIO.Read/PubsubUnboundedSourceBigQueryIO.Write/BigQueryIO.StreamWithDeDupBigQueryIO.Write/StreamingInserts/StreamingWriteTables 来克服我的错误。因此,请尝试将您的旧步骤映射到日志中作业失败消息之前的完整日志中提到的步骤。

    很遗憾,由于使用的编码器从旧代码更改为新代码,导致兼容性检查失败,但我的 missing step 错误已解决。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-08-10
      • 1970-01-01
      • 2021-08-16
      • 1970-01-01
      • 1970-01-01
      • 2020-07-23
      • 2019-02-06
      • 2019-11-21
      相关资源
      最近更新 更多