【发布时间】:2018-04-06 21:15:36
【问题描述】:
我正在将谷歌云数据流作业从 dataflow java sdk 1.8 升级到 2.4 版,然后尝试使用 --update 和 --transformNameMapping 参数更新其在谷歌云上的现有数据流作业,但我不知道如何正确编写 transformNameMappings 以使升级成功并通过兼容性检查。
我的代码在兼容性检查中失败并出现以下错误:
Workflow failed. Causes: The new job is not compatible with 2018-04-06_13_48_04-12999941762965935736. The original job has not been aborted., The new job is missing steps BigQueryIO.writeTableRows/BigQueryIO.StreamWithDeDup/Reshuffle/GroupByKey, PubsubIO.readStrings. If these steps have been renamed or deleted, please specify them with the update command.
现有、当前正在运行的作业的数据流转换名称是:
PubsubIO.Read
ParDo(ExtractJsonPath) - 我们编写的自定义函数
ParDo(AddMetadata) - 我们编写的另一个自定义函数
BigQueryIO.Write
在我使用 2.4 sdk 的新代码中,我更改了第 1 次和第 4 次转换/函数,因为一些库被重命名并且在新版本中弃用了一些旧 sdk 的函数。
具体的变换代码可以看下面:
1.8 SDK 版本:
PCollection<String> streamData =
pipeline
.apply(PubsubIO.Read
.timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
//.subscription(options.getPubsubSubscription())
.topic(options.getPubsubTopic()));
streamData
.apply(ParDo.of(new ExtractJsonPathFn(pathInfos)))
.apply(ParDo.of(new AddMetadataFn()))
.apply(BigQueryIO.Write
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.to(tableRef)
我重写的2.4 SDK版本:
PCollection<String> streamData =
pipeline
.apply("PubsubIO.readStrings", PubsubIO.readStrings()
.withTimestampAttribute(PUBSUB_TIMESTAMP_LABEL_KEY)
//.subscription(options.getPubsubSubscription())
.fromTopic(options.getPubsubTopic()));
streamData
.apply(ParDo.of(new ExtractJsonPathFn(pathInfos)))
.apply(ParDo.of(new AddMetadataFn()))
.apply("BigQueryIO.writeTableRows", BigQueryIO.writeTableRows()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.to(tableRef)
所以在我看来 PubsubIO.Read 应该映射到 PubsubIO.readStrings 和 BigQueryIO.Write 应该映射到 BigQueryIO.writeTableRows。但我可能会误解这是如何工作的。
我一直在尝试各种各样的事情 - 我试图给出我未能重新映射已定义名称的这两个转换,因为它们以前没有明确命名,所以我将我的应用更新为 .apply("PubsubIO.readStrings" 和 @987654329 @ 然后将我的 transformNameMapping 参数设置为:
--transformNameMapping={\"BigQueryIO.Write\":\"BigQueryIO.writeTableRows\",\"PubsubIO.Read\":\"PubsubIO.readStrings\"}
或
--transformNameMapping={\"BigQueryIO.Write/BigQueryIO.StreamWithDeDup/Reshuffle/GroupByKey\":\"BigQueryIO.writeTableRows/BigQueryIO.StreamWithDeDup/Reshuffle/GroupByKey\",\"PubsubIO.Read\":\"PubsubIO.readStrings\"}
甚至尝试重新映射复合变换中的所有内部变换
--transformNameMapping={\"BigQueryIO.Write/BigQueryIO.StreamWithDeDup/Reshuffle/GroupByKey\":\"BigQueryIO.writeTableRows/BigQueryIO.StreamWithDeDup/Reshuffle/GroupByKey\",\"BigQueryIO.Write/BigQueryIO.StreamWithDeDup/Reshuffle\":\"BigQueryIO.writeTableRows/BigQueryIO.StreamWithDeDup/Reshuffle\",\"BigQueryIO.Write/BigQueryIO.StreamWithDeDup\":\"BigQueryIO.writeTableRows/BigQueryIO.StreamWithDeDup\",\"BigQueryIO.Write\":\"BigQueryIO.writeTableRows\",\"PubsubIO.Read\":\"PubsubIO.readStrings\"}
但无论如何我似乎都会遇到同样的错误:
The new job is missing steps BigQueryIO.writeTableRows/BigQueryIO.StreamWithDeDup/Reshuffle/GroupByKey, PubsubIO.readStrings.
想知道我是否做错了什么?在谁愿意分享他们使用的格式之前写过转换映射的任何人?除了关于更新数据流作业的主要谷歌文档之外,我根本找不到任何在线示例,除了最简单的案例--transformNameMapping={"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...} 之外并没有真正涵盖任何内容,并且没有使示例非常具体。
【问题讨论】:
标签: google-cloud-platform google-cloud-dataflow