【问题标题】:Cloud Pub/Sub to GCS, write per element (Dataflow Pipeline)Cloud Pub/Sub 到 GCS,按元素写入(数据流管道)
【发布时间】:2018-12-02 04:00:51
【问题描述】:

您如何在每次收到来自 Pubsub 的消息时写入 GCS,它会进行窗口写入,而不是按元素写入。非常感谢有关此问题的任何提示。

示例链接 (https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubsubToText.java)

在运行此示例代码时,它会写入发送到 GCS 的 pub-sub 消息。但是当持续时间设置为 1 分钟时,它会保存所有消息,然后在一分钟后写入 1 个文件,但我希望它将每条消息写入不同的文件。

【问题讨论】:

    标签: google-cloud-storage google-cloud-dataflow publish-subscribe apache-beam


    【解决方案1】:

    如果每条消息都需要一个文件,一种选择是创建这样的简单转换:

    package com.myapp.dataflow.transform;
    
    import org.apache.beam.sdk.transforms.DoFn;
    import com.google.cloud.storage.*;
    import static java.nio.charset.StandardCharsets.UTF_8;
    
    public class StringToGcsFile extends DoFn<String, Blob> {
        private Storage storage;
        private String bucketName = "my-bucket";
    
        @Setup
        public void setup() {
            storage = StorageOptions.getDefaultInstance().getService();
        }
    
        @ProcessElement
        public void processElement(ProcessContext c) {
            // consider some strategy for object names, UUID or something
            String blobName = "my_blob_name";
    
            // Upload a blob to the bucket
            BlobId blobId = BlobId.of(bucketName, blobName);
            BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
            Blob blob = storage.create(blobInfo, c.element().getBytes(UTF_8));
    
            c.output(blob);
        }
    }
    

    Maven 依赖:

    <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>google-cloud-storage</artifactId>
        <version>1.35.0</version>
    </dependency>
    

    【讨论】:

      【解决方案2】:

      您可以创建一个Google Cloud Function 来自动执行此操作。云函数可以是triggered 4 个不同的事件。其中之一是Pub/Sub message publishing。 如果你想测试一个例子,参考这个Pub/Sub tutorial

      您应该编写代码以正确地将每条消息重定向到所需的 GCS,例如基于 Pub/Sub 主题。

      【讨论】:

        【解决方案3】:

        我已经使用 processElement 实现了相同的功能。

        下面是示例代码。

        管道步骤:

        pipeline_object.apply("Manually write events to GCS", ParDo.of(new Write_to_GCS()));
        

        ProcessElement函数:

        @SuppressWarnings("serial")
        static class Write_to_GCS extends DoFn<KV<String, String>, TextIO.Write> {
            @ProcessElement
            public void processElement(ProcessContext c) throws JSONException {
        
                // Fetch text you need to write into file
                String output_string = c.element().getValue();
        
                // Create your service object
                Storage storage = StorageOptions.getDefaultInstance().getService();
        
                // Upload a blob to the newly created bucket
                BlobId blobId = BlobId.of(GCS_BUCKET_NAME, STORAGE_FILE_PATH);
                BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
                @SuppressWarnings("unused")
                Blob blob = storage.create(blobInfo, event_string.getBytes(UTF_8));
            }
        }
        

        您需要在 pom.xml 中包含以下依赖项

        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-storage</artifactId>
            <version>1.37.1</version>
        </dependency>
        

        这段代码的作用是创建一个 gcs 存储服务对象并将一个 blob 写入指定路径。

        【讨论】:

          猜你喜欢
          • 2019-07-11
          • 2019-09-11
          • 2020-12-15
          • 2019-12-20
          • 1970-01-01
          • 2022-01-13
          • 2019-08-18
          • 2019-07-23
          • 2019-10-03
          相关资源
          最近更新 更多