【发布时间】:2017-01-05 13:33:55
【问题描述】:
我正在编写一个应该做 3 件事的 Dataflow 管道:
- 从 GCP 存储读取 .csv 文件
- 将数据解析为 BigQuery 兼容的 TableRows
- 将数据写入 BigQuery 表
到目前为止,这一切都像一个魅力。它仍然存在,但是当我更改源变量和目标变量时,没有任何变化。实际运行的作业是旧作业,而不是最近更改(和提交)的代码。不知何故,当我使用 BlockingDataflowPipelineRunner 从 Eclipse 运行代码时,代码本身并未上传,而是使用了旧版本。
代码通常没有问题,但要尽可能完整:
public class BatchPipeline {
String source = "gs://sourcebucket/*.csv";
String destination = "projectID:datasetID.testing1";
//Creation of the pipeline with default arguments
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<String> line = p.apply(TextIO.Read.named("ReadFromCloudStorage")
.from(source));
@SuppressWarnings("serial")
PCollection<TableRow> tablerows = line.apply(ParDo.named("ParsingCSVLines").of(new DoFn<String, TableRow>(){
@Override
public void processElement(ProcessContext c){
//processing code goes here
}
}));
//Defining the BigQuery table scheme
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("datetime").setType("TIMESTAMP").setMode("REQUIRED"));
fields.add(new TableFieldSchema().setName("consumption").setType("FLOAT").setMode("REQUIRED"));
fields.add(new TableFieldSchema().setName("meterID").setType("STRING").setMode("REQUIRED"));
TableSchema schema = new TableSchema().setFields(fields);
String table = destination;
tablerows.apply(BigQueryIO.Write
.named("BigQueryWrite")
.to(table)
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withoutValidation());
//Runs the pipeline
p.run();
}
出现这个问题是因为我刚刚更换了笔记本电脑并且必须重新配置所有内容。我正在开发一个干净的 Ubuntu 16.04 LTS 操作系统,并安装了 GCP 开发的所有依赖项(通常)。通常一切都配置得很好,因为我可以开始工作(如果我的配置出错,这应该是不可能的,对吧?)。顺便说一句,我正在使用 Eclipse Neon。
那么问题出在哪里?在我看来,上传代码有问题,但我已确保我的云 git 存储库是最新的,并且暂存桶已被清理...
**** 更新 ****
我从来没有发现到底出了什么问题,但是当我检查我部署的 jar 中文件的创建日期时,我确实看到它们从未真正更新过。然而,jar 文件本身有一个最近的时间戳,这让我完全忽略了这个问题(新手错误)。
我最终通过简单地在 Eclipse 中创建一个新的 Dataflow 项目并将我的 .java 文件从损坏的项目复制到新项目中来让这一切再次正常工作。从那时起,一切都像魅力一样运作。
【问题讨论】:
-
您是否在运行前验证了暂存桶是空的,并且在运行时填充了一个新的(带时间戳的)jar?
-
是的,我已经尝试过使用新的空桶。工作完成后,它填充了新的 jar 文件,但我的新代码仍然没有执行......不知何故 Dataflow 没有收到我无法理解的实际新编写的代码。
标签: java eclipse google-cloud-platform google-cloud-dataflow