【问题标题】:large numpy matrix as dataflow side input大 numpy 矩阵作为数据流侧输入
【发布时间】:2019-01-23 00:33:55
【问题描述】:

我正在尝试在 Python 中编写一个需要大型 numpy 矩阵作为侧输入的数据流管道。矩阵保存在云存储中。理想情况下,每个 Dataflow 工作人员都会直接从云存储中加载矩阵。

我的理解是,如果我说matrix = np.load(LOCAL_PATH_TO_MATRIX),然后

p | "computation" >> beam.Map(computation, matrix)

矩阵从我的笔记本电脑运送到每个 Datflow 工作人员。

我怎样才能指示每个工作人员直接从云存储中加载矩阵?有“二进制斑点”的光束源吗?

【问题讨论】:

    标签: google-cloud-dataflow apache-beam


    【解决方案1】:

    你的方法是正确的。

    在这种情况下,Dataflow 所做的是将 NumPy 矩阵作为侧输入进行处理。这意味着它从您的机器上传到服务一次,Dataflow 服务会将其发送给每个工作人员。

    鉴于矩阵很大,这将使您的工作人员使用 I/O 从服务接收它,并承担将整个矩阵保存在内存中的负担,但它应该可以工作。


    如果您想避免在机器中计算/加载矩阵,您可以将矩阵作为文本文件上传到 GCS,读入该文件并获取矩阵。你可以这样做:

    matrix_file = 'gs://mybucket/my/matrix'
    p | beam.ParDo(ComputationDoFn(matrix_file))
    

    你的 DoFn 可能是这样的:

    class ComputationDoFn(beam.DoFn):
      def __init__(self, matrix_file):
        self._matrix_file = matrix_file
        self._matrix = None
    
      def start_bundle(self, element):
        # We check because one DoFn instance may be reused
        # for different bundles.
        if self._matrix is None:
          self.load_matrix(self._matrix_file)
    
      def process(self, element):
        # Now process the element
    
      def load_matrix(self, matrix_file):
        # Load the file from GCS using the GCS API
    

    我希望这是有道理的。如果您觉得需要更多帮助,我可以充实这些功能。

    【讨论】:

    • 有什么办法可以完全避免我的机器上有矩阵吗?让工作人员从云存储中获取矩阵吗?
    • 我为此添加了一个策略。您不会使用侧面输入 - 尽管您也可以使用侧面输入。
    • 如何加载一个矩阵作为side_input?你会使用AsSingleton 吗?我看不出它还能怎么挤进去,它在一维集合的意义上是不可迭代的。
    猜你喜欢
    • 1970-01-01
    • 2016-04-28
    • 1970-01-01
    • 1970-01-01
    • 2018-12-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-12-19
    相关资源
    最近更新 更多