【发布时间】:2018-05-10 06:32:15
【问题描述】:
当我传递参数暂存、临时和输出 GCS 存储桶位置时,数据流作业失败并出现以下异常。
Java 代码:
final String[] used = Arrays.copyOf(args, args.length + 1);
used[used.length - 1] = "--project=OVERWRITTEN"; final T options =
PipelineOptionsFactory.fromArgs(used).withValidation().as(clazz);
options.setProject(PROJECT_ID);
options.setStagingLocation("gs://abc/staging/");
options.setTempLocation("gs://abc/temp");
options.setRunner(DataflowRunner.class);
options.setGcpTempLocation("gs://abc");
错误:
INFO: Staging pipeline description to gs://ups-heat-dev- tmp/mniazstaging_ingest_validation/staging/
May 10, 2018 11:56:35 AM org.apache.beam.runners.dataflow.util.PackageUtil tryStagePackage
INFO: Uploading <42088 bytes, hash E7urYrjAOjwy6_5H-UoUxA> to gs://ups-heat-dev-tmp/mniazstaging_ingest_validation/staging/pipeline-E7urYrjAOjwy6_5H-UoUxA.pb
Dataflow SDK version: 2.4.0
May 10, 2018 11:56:38 AM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: Printed job specification to gs://ups-heat-dev-tmp/mniazstaging_ingest_validation/templates/DataValidationPipeline
May 10, 2018 11:56:40 AM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: Template successfully created.
Exception in thread "main" java.lang.NullPointerException
at org.apache.beam.runners.dataflow.DataflowPipelineJob.getJobWithRetries(DataflowPipelineJob.java:501)
at org.apache.beam.runners.dataflow.DataflowPipelineJob.getStateWithRetries(DataflowPipelineJob.java:477)
at org.apache.beam.runners.dataflow.DataflowPipelineJob.waitUntilFinish(DataflowPipelineJob.java:312)
at org.apache.beam.runners.dataflow.DataflowPipelineJob.waitUntilFinish(DataflowPipelineJob.java:248)
at org.apache.beam.runners.dataflow.DataflowPipelineJob.waitUntilFinish(DataflowPipelineJob.java:202)
at org.apache.beam.runners.dataflow.DataflowPipelineJob.waitUntilFinish(DataflowPipelineJob.java:195)
at com.example.DataValidationPipeline.main(DataValidationPipeline.java:66)
【问题讨论】:
-
您介意用您使用的完整命令行命令更新它吗?
-
从 Eclipse 运行并在代码中设置参数。
-
使用的最终字符串[] = Arrays.copyOf(args, args.length + 1); used[used.length - 1] = "--project=OVERWRITTEN";最终 T 选项 = PipelineOptionsFactory.fromArgs(used).withValidation().as(clazz); options.setProject(PROJECT_ID); options.setStagingLocation("gs://abc/staging/"); options.setTempLocation("gs://abc/temp"); options.setRunner(DataflowRunner.class); options.setGcpTempLocation("gs://abc");
-
嗨,您介意提供更多上下文吗?能否提供完整的代码和 pom.xml 文件,看看你使用的是哪个版本的依赖。
-
@MohammedNiaz - 你好,问题解决了吗?如果是,您能分享一下解决方案吗?
标签: google-cloud-platform google-cloud-dataflow