【问题标题】:Convert a list into a PCollection将列表转换为 PCollection
【发布时间】:2021-05-15 21:29:20
【问题描述】:

我目前有一个DoFn,它查看一个存储桶并查看该存储桶和目录前缀中的所有文件。这个DoFn 返回一个列表而不是PCollection。如何将此列表转换为PCollection 可以被DoFn ConvertFileNames 使用?

  # List all the files within a subdir 
  class ListBlobs(beam.DoFn):
    def start_bundle(self):
      self.storage_client = storage.Client()

    def process(self, prefix):
      bucket = self.storage_client.bucket('xxx')
      return list(self.bucket.list_blobs(prefix=prefix))

  # Convert Blobs into filenames as patterns
  class ConvertFileNames(beam.DoFn):
    def process(self, blob):
      return 'gs://' + blob.bucket.name + blob.name

【问题讨论】:

    标签: python-3.x google-cloud-storage apache-beam dataflow


    【解决方案1】:

    beam documentation 中所述,Beam DoFn 的 process 方法返回一个可迭代的元素以放置到下游 PCollection 中。所以,在你的例子中,如果我有一个前缀的 PCollection,称之为prefix_pcoll,那么我可以写

    blobs_pcoll = prefix_pcoll | beam.ParDo(ListBlobs())
    

    blobs_pcoll 将包含带有此前缀的 blob 列表(即,list(self.bucket.list_blobs(prefix=prefix)) 在所有前缀上的串联)。然后你可以写

    converted = blobs_pcoll | beam.ParDo(ConvertFileNames())
    

    你也可以写

    converted = blobs_pcoll | beam.Map(
        lambda blob: 'gs://' + blob.bucket.name + blob.name)
    

    您可能还想查看apache_beam.io.fileio.MatchAll

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-12-31
      • 2021-12-14
      • 2023-02-13
      • 2023-02-03
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多