【问题标题】:Apache beam Dataflow : File Transfer from Azure to GCSApache Beam 数据流:从 Azure 到 GCS 的文件传输
【发布时间】:2021-08-03 09:02:30
【问题描述】:

我尝试将文件从 Azure 容器传输到 GCS 存储桶,但最终出现以下问题

  1. 源文件中的记录顺序与目标文件的记录顺序不同,因为管道将进行并行处理
  2. 必须编写大量自定义代码来为 GCS 目标文件提供自定义名称,因为管道为其提供默认名称。

无论如何,Apache 管道可以在不处理文件内容的情况下传输文件本身(这样就不会发生上述问题)?因为我需要将多个文件从 Azure 容器传输到 GCS 存储桶

下面是我目前用来传输文件的代码

String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("YYYY_MM_DD_HH_MM_SS3")).toString();

String connectionString = "<<AZURE_STORAGE_CONNECTION_STRING>>"; 
        
PipelineOptions options = PipelineOptionsFactory.create();
options.as(BlobstoreOptions.class).setAzureConnectionString(connectionString);
        
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("azfs://storageaccountname/containername/CSVSample.csv"))
.apply("",FileIO.<String>write().to("azfs://storageaccountname/containername/"+format+"/").withNumShards(1).withSuffix(".csv")
        .via(TextIO.sink()));
p.run().waitUntilFinish();

【问题讨论】:

    标签: java azure-blob-storage google-cloud-dataflow apache-beam


    【解决方案1】:

    您应该能够为此目的使用FileIO 转换。

    例如(未经测试的伪代码),

    FileIO.match().filepattern("azfs://storageaccountname/containername/CSVSample.csv")
    .apply(FileIO.readMatches())
    .apply(ParDo.of(new MyWriteDoFn()));
    

    MyWriteDoFn() 上方是 DoFn,它从单个文件中读取字节(使用 AzureBlobStoreFileSystem)并写入 GCS(使用 GCSFileSystem)。您可以使用 FileSystems 类中带有正确前缀的静态方法,而不是直接调用底层 FileSystem 实现的方法。

    【讨论】:

      猜你喜欢
      • 2020-06-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-07-29
      • 1970-01-01
      • 1970-01-01
      • 2020-06-22
      • 1970-01-01
      相关资源
      最近更新 更多