【发布时间】:2018-06-06 16:54:46
【问题描述】:
我编写了代码,将 CSV 文件从 GCS 注入到 BigQuery,其中包含硬编码的 ProjectID、数据集、表名称、GCS Temp 和暂存位置。
我正在寻找应该阅读的代码
- 项目ID
- 数据集
- 表名
- GCS 温度和分段位置参数
来自BigQuery table(Dynamic parameters)。
代码:-
public class DemoPipeline {
public static TableReference getGCDSTableReference() {
TableReference ref = new TableReference();
ref.setProjectId("myprojectbq");
ref.setDatasetId("DS_Emp");
ref.setTableId("emp");
return ref;
}
static class TransformToTable extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
String input = c.element();
String[] s = input.split(",");
TableRow row = new TableRow();
row.set("id", s[0]);
row.set("name", s[1]);
c.output(row);
}
}
public interface MyOptions extends PipelineOptions {
/*
* Param
*
*/
}
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
options.setTempLocation("gs://demo-xxxxxx/temp");
Pipeline p = Pipeline.create(options);
PCollection<String> lines = p.apply("Read From Storage", TextIO.read().from("gs://demo-xxxxxx/student.csv"));
PCollection<TableRow> rows = lines.apply("Transform To Table",ParDo.of(new TransformToTable()));
rows.apply("Write To Table",BigQueryIO.writeTableRows().to(getGCDSTableReference())
//.withSchema(BQTableSemantics.getGCDSTableSchema())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
p.run();
}
}
【问题讨论】:
-
我不太明白这个问题。您想使用 BigQuery 作为源,并根据您从其他源处理的元素从特定表和/或数据集加载吗?或者将其用作接收器并根据您从其他来源处理的元素写入特定表和/或数据集?
-
感谢亚历克斯的回复。我的要求是将 CSV 文件从 GCS 加载到 BigQuery,而无需在 Java 代码中硬编码项目 ID/数据集/表名称。我想从外部存储或动态参数(模板)中读取这些参数。请多多指教。
-
@Kannan 只需使用配置文件
-
@Haris Nadeem ,如果您提供一些示例以及如何从 GCS 读取配置文件,将不胜感激。我的要求是从 GCS 读取源 CSV 文件并与来自 GCS 的配置 CSV 文件(我将维护列名)进行比较,然后将其加载到 Bigquery 中。提前致谢。
-
您可以在此处找到配置文件的示例:mkyong.com/java/java-properties-file-examples,然后您只需将配置文件与您的工作打包
标签: google-cloud-platform google-cloud-dataflow apache-beam