【问题标题】:Cloud Pub/Sub to GCS, write per element (Dataflow Pipeline)Cloud Pub/Sub 到 GCS,按元素写入(数据流管道)
【发布时间】:2018-12-02 04:00:51
【问题描述】:
【问题讨论】:
标签:
google-cloud-storage
google-cloud-dataflow
publish-subscribe
apache-beam
【解决方案1】:
如果每条消息都需要一个文件,一种选择是创建这样的简单转换:
package com.myapp.dataflow.transform;
import org.apache.beam.sdk.transforms.DoFn;
import com.google.cloud.storage.*;
import static java.nio.charset.StandardCharsets.UTF_8;
public class StringToGcsFile extends DoFn<String, Blob> {
private Storage storage;
private String bucketName = "my-bucket";
@Setup
public void setup() {
storage = StorageOptions.getDefaultInstance().getService();
}
@ProcessElement
public void processElement(ProcessContext c) {
// consider some strategy for object names, UUID or something
String blobName = "my_blob_name";
// Upload a blob to the bucket
BlobId blobId = BlobId.of(bucketName, blobName);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
Blob blob = storage.create(blobInfo, c.element().getBytes(UTF_8));
c.output(blob);
}
}
Maven 依赖:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>1.35.0</version>
</dependency>
【解决方案3】:
我已经使用 processElement 实现了相同的功能。
下面是示例代码。
管道步骤:
pipeline_object.apply("Manually write events to GCS", ParDo.of(new Write_to_GCS()));
ProcessElement函数:
@SuppressWarnings("serial")
static class Write_to_GCS extends DoFn<KV<String, String>, TextIO.Write> {
@ProcessElement
public void processElement(ProcessContext c) throws JSONException {
// Fetch text you need to write into file
String output_string = c.element().getValue();
// Create your service object
Storage storage = StorageOptions.getDefaultInstance().getService();
// Upload a blob to the newly created bucket
BlobId blobId = BlobId.of(GCS_BUCKET_NAME, STORAGE_FILE_PATH);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
@SuppressWarnings("unused")
Blob blob = storage.create(blobInfo, event_string.getBytes(UTF_8));
}
}
您需要在 pom.xml 中包含以下依赖项
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>1.37.1</version>
</dependency>
这段代码的作用是创建一个 gcs 存储服务对象并将一个 blob 写入指定路径。