【发布时间】:2021-05-12 15:04:19
【问题描述】:
查看this 问题上的代码,我希望能够创建一个数据流管道,该管道可以查看特定 gcs 存储桶文件夹中的所有文件,并根据以下方面说明具有最大数据量的最终子目录字节。我会编写类似于:
的代码class SortFiles(beam.DoFn):
def __init__(self, gfs):
self.gfs = gfs
def process(self, file_metadata):
if file_metadata.size_in_bytes > 0:
# Sort the files here?
class SortFolders(beam.DoFn):
def __init__(self, gfs):
self.gfs = gfs
def process(self, file_metadata):
if file_metadata.size_in_bytes > 0:
# Sort the folders here based on maximum addition of a combination
# of the file sizes and file numbers
def delete_empty_files():
options = PipelineOptions(...)
gfs = gcs.GCSFileSystem(pipeline_options)
p = beam.Pipeline(options=pipeline_options)
discover_empty = p | 'Filenames' >> beam.Create(gfs.match(gs_folder).metadata_list)
| 'Reshuffle' >> beam.Reshuffle()
| 'SortFilesbySize' >> beam.ParDo(SortFiles(gfs))
| 'SortFoldersbySize' >> beam.ParDo(SortFolders(gfs))
| 'OutputFolders' >> ...
我还没有决定是按字节总数还是其中的文件总数列出文件夹。我将如何解决这个问题?另一个问题在于我希望能够找到最终的子目录,而不是这个任务的父文件夹。
【问题讨论】:
标签: python-3.x google-cloud-storage google-cloud-dataflow apache-beam dataflow