【发布时间】:2021-05-15 21:29:20
【问题描述】:
我目前有一个DoFn,它查看一个存储桶并查看该存储桶和目录前缀中的所有文件。这个DoFn 返回一个列表而不是PCollection。如何将此列表转换为PCollection 可以被DoFn ConvertFileNames 使用?
# List all the files within a subdir
class ListBlobs(beam.DoFn):
def start_bundle(self):
self.storage_client = storage.Client()
def process(self, prefix):
bucket = self.storage_client.bucket('xxx')
return list(self.bucket.list_blobs(prefix=prefix))
# Convert Blobs into filenames as patterns
class ConvertFileNames(beam.DoFn):
def process(self, blob):
return 'gs://' + blob.bucket.name + blob.name
【问题讨论】:
标签: python-3.x google-cloud-storage apache-beam dataflow