【问题标题】:Google Cloud Dataflow: Submitted job is executing but using old codeGoogle Cloud Dataflow:提交的作业正在执行但使用旧代码
【发布时间】:2017-01-05 13:33:55
【问题描述】:

我正在编写一个应该做 3 件事的 Dataflow 管道:

  • 从 GCP 存储读取 .csv 文件
  • 将数据解析为 BigQuery 兼容的 TableRows
  • 将数据写入 BigQuery 表

到目前为止,这一切都像一个魅力。它仍然存在,但是当我更改源变量和目标变量时,没有任何变化。实际运行的作业是旧作业,而不是最近更改(和提交)的代码。不知何故,当我使用 BlockingDataflowPipelineRunner 从 Eclipse 运行代码时,代码本身并未上传,而是使用了旧版本。

代码通常没有问题,但要尽可能完整:

public class BatchPipeline {
    String source = "gs://sourcebucket/*.csv";
    String destination = "projectID:datasetID.testing1";    

    //Creation of the pipeline with default arguments
    Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

    PCollection<String> line = p.apply(TextIO.Read.named("ReadFromCloudStorage")
            .from(source));

    @SuppressWarnings("serial")
    PCollection<TableRow> tablerows = line.apply(ParDo.named("ParsingCSVLines").of(new DoFn<String, TableRow>(){
        @Override
        public void processElement(ProcessContext c){
             //processing code goes here
        }
    }));

    //Defining the BigQuery table scheme
    List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("datetime").setType("TIMESTAMP").setMode("REQUIRED"));
    fields.add(new TableFieldSchema().setName("consumption").setType("FLOAT").setMode("REQUIRED"));
    fields.add(new TableFieldSchema().setName("meterID").setType("STRING").setMode("REQUIRED"));
    TableSchema schema = new TableSchema().setFields(fields);
    String table = destination;

    tablerows.apply(BigQueryIO.Write
            .named("BigQueryWrite")
            .to(table)
            .withSchema(schema)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withoutValidation());

    //Runs the pipeline
    p.run();
}

出现这个问题是因为我刚刚更换了笔记本电脑并且必须重新配置所有内容。我正在开发一个干净的 Ubuntu 16.04 LTS 操作系统,并安装了 GCP 开发的所有依赖项(通常)。通常一切都配置得很好,因为我可以开始工作(如果我的配置出错,这应该是不可能的,对吧?)。顺便说一句,我正在使用 Eclipse Neon。

那么问题出在哪里?在我看来,上传代码有问题,但我已确保我的云 git 存储库是最新的,并且暂存桶已被清理...

**** 更新 ****

我从来没有发现到底出了什么问题,但是当我检查我部署的 jar 中文件的创建日期时,我确实看到它们从未真正更新过。然而,jar 文件本身有一个最近的时间戳,这让我完全忽略了这个问题(新手错误)。

我最终通过简单地在 Eclipse 中创建一个新的 Dataflow 项目并将我的 .java 文件从损坏的项目复制到新项目中来让这一切再次正常工作。从那时起,一切都像魅力一样运作。

【问题讨论】:

  • 您是否在运行前验证了暂存桶是空的,并且在运行时填充了一个新的(带时间戳的)jar?
  • 是的,我已经尝试过使用新的空桶。工作完成后,它填充了新的 jar 文件,但我的新代码仍然没有执行......不知何故 Dataflow 没有收到我无法理解的实际新编写的代码。

标签: java eclipse google-cloud-platform google-cloud-dataflow


【解决方案1】:

提交 Dataflow 作业后,您可以通过检查属于作业描述一部分的文件来检查哪些工件是作业规范的一部分,该文件可通过DataflowPipelineWorkerPoolOptions#getFilesToStage 获得。下面的代码 sn-p 给出了如何获取这些信息的一个小示例。

PipelineOptions myOptions = ...
myOptions.setRunner(DataflowPipelineRunner.class);
Pipeline p = Pipeline.create(myOptions);

// Build up your pipeline and run it.
p.apply(...)
p.run();

// At this point in time, the files which were staged by the 
// DataflowPipelineRunner will have been populated into the
// DataflowPipelineWorkerPoolOptions#getFilesToStage
List<String> stagedFiles = myOptions.as(DataflowPipelineWorkerPoolOptions.class).getFilesToStage();
for (String stagedFile : stagedFiles) {
  System.out.println(stagedFile);
}

上面的代码应该打印出如下内容:

/my/path/to/file/dataflow.jar
/another/path/to/file/myapplication.jar
/a/path/to/file/alibrary.jar

您上传的作业的资源部分可能在某种程度上已过期,其中包含您的旧代码。查看暂存列表的所有目录和 jar 部分,找到 BatchPipeline 的所有实例并验证它们的年龄。可以使用jar 工具或任何zip 文件阅读器提取jar 文件。或者使用javap 或任何其他class file inspector 来验证BatchPipeline 类文件是否与您所做的预期更改一致。

【讨论】:

  • 我对 Dataflow 还很陌生,所以我是否有这方面的指南?我真的不知道如何开始调试这个问题......如果你有一些建议,拍摄,因为我可以使用一些体面的方法/技巧,以便将来能够自己调试这些问题。
  • 您是想说我的回答确实解决了您的问题,并且您想要关于如何调试未来问题的一般指导,还是您想说我的回答没有足够的细节给您可以试试吗?
  • 我确实可以使用一些指导来调试我的代码,因为我对此还是很陌生。你的解决方案可能会解决这个问题,但老实说,我真的不知道如何开始实施它......所以如果这对你来说不是太麻烦,我真的很感激你的答案的更详细版本。提前致谢!
  • 感谢您提供更详细的答案!尽管我没有找到确切的问题,但一切都恢复了(请参阅上面的更新)。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-03-07
  • 2019-02-21
  • 1970-01-01
  • 2016-05-31
  • 2019-05-19
  • 2015-02-21
相关资源
最近更新 更多