【问题标题】:How to use matplotlib module in Apache Beam Google DataFlow runner如何在 Apache Beam Google DataFlow 运行器中使用 matplotlib 模块
【发布时间】:2017-09-13 07:16:27
【问题描述】:

是否可以从 google 数据流 (beam) 中获取 matplotlib 模块?我的 requirements.txt 中有它:

matplotlib==2.0.2

但还是报错:

ImportError: No module named matplotlib

谢谢!

【问题讨论】:

    标签: python matplotlib google-cloud-dataflow apache-beam


    【解决方案1】:

    要准备自定义 DataFlow 工作器,您应该提供带有命令的 setup.py 文件以安装所需的软件包。 首先,创建setup.py 文件,如下所示(它是一个通用的setup.py 文件)。 您应该在REQUIRED_PACKAGES 变量中列出您的包,或者像我一样将pip install matplotlib==2.0.2 放入CUSTOM_COMMANDS

    请注意,ma​​tplotlib 需要在系统中安装一些额外的包/库,因此您也需要通过为它们指定安装命令来安装它们。此外,如果您想在 DataFlow 作业中渲染绘图,您需要将 matplotlib 后端配置为一个,以便能够写入文件输出(请参阅How can I set the 'backend' in matplotlib in Python?)。

    然后,创建setup.py 文件后,只需指定Apache Beam 管道参数:

    import apache_beam as beam
    p = beam.Pipeline("DataFlowRunner", argv=[
    '--setup_file', './setup.py',
    # put other parameters here
    ])
    

    通用setup.py 文件:

    import sys
    import os
    import logging
    import subprocess
    import pickle
    
    import setuptools
    import distutils
    
    from setuptools.command.install import install as _install
    
    
    
    class install(_install):  # pylint: disable=invalid-name
        def run(self):
            self.run_command('CustomCommands')
            _install.run(self)
    
    CUSTOM_COMMANDS = [
        ['pip', 'install', 'matplotlib==2.0.2'],
    ]
    
    
    class CustomCommands(setuptools.Command):
        """A setuptools Command class able to run arbitrary commands."""
    
        def initialize_options(self):
            pass
    
        def finalize_options(self):
            pass
    
        def RunCustomCommand(self, command_list):
            logging.info('Running command: %s' % command_list)
            p = subprocess.Popen(
                command_list,
                stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
            # Can use communicate(input='y\n'.encode()) if the command run requires
            # some confirmation.
            stdout_data, _ = p.communicate()
            logging.info('Command output: %s' % stdout_data)
            if p.returncode != 0:
                raise RuntimeError(
                    'Command %s failed: exit code: %s' % (command_list, p.returncode))
    
        def run(self):
            for command in CUSTOM_COMMANDS:
                self.RunCustomCommand(command)
    
    
    REQUIRED_PACKAGES = [
    
    ]
    
    
    setuptools.setup(
        name='name',
        version='1.0.0',
        description='DataFlow worker',
        install_requires=REQUIRED_PACKAGES,
        packages=setuptools.find_packages(),
        cmdclass={
            'install': install,
            'CustomCommands': CustomCommands,
            }
        )
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-10-20
      • 2021-05-10
      • 2021-11-21
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多