【问题标题】:include other files with dataflow在数据流中包含其他文件
【发布时间】:2020-03-28 06:54:04
【问题描述】:

我的数据流使用 .sql 文件。该文件包含一个查询,它位于名为 queries 的目录中。

我需要将此文件与我的数据流一起上传。

我发现使用了 manifest.in 文件,但据我所知,这并没有做任何事情,我在我的根目录中创建了这个名为 MANIFEST.in 的文件,它包含一行:

recursive-include queries *

其他一些消息来源告诉我,我需要为此使用 setup.py 文件。所以现在看起来像这样:

from __future__ import absolute_import
from __future__ import print_function

import subprocess
from distutils.command.build import build as _build

import setuptools  # pylint: disable-all
setuptools.setup(
    name='MarkPackage',
    version='0.0.1',
    install_requires=[],
    packages=setuptools.find_packages(),
    package_data={
        'queries': ['queries/*'],
    },
    include_package_data=True
)

这也行不通。 错误是:RuntimeError: FileNotFoundError: [Errno 2] No such file or directory: 'queries/testquery.sql' [while running 'generatedPtransform-20']

在我的数据流的任何部分或所有部分中包含要使用的任何文件的最佳做法是什么?

【问题讨论】:

    标签: python google-cloud-dataflow apache-beam


    【解决方案1】:

    这个解决方案是由我们的 Google Cloud 顾问提供给我的。它有效,但建议不要这样做,因为它增加了复杂性,只是为了将 SQL 查询与 Python 代码分开。 另一种方法是在 Bigquery 上创建一个包含此 SQL 代码的视图,并在 Bigquery 环境中对其进行维护。

    MANIFEST.in
    include query.sql

    setup.py

    import setuptools
    setuptools.setup(
        name="example",
        version="0.0.1",
        install_requires=[],
        packages=setuptools.find_packages(),
        data_files=[(".", ["query.sql"])],
        include_package_data=True,
    )
    

    ma​​in.py

    with open ("query.sql", "r") as myfile:
            query=myfile.read()
        with beam.Pipeline(argv=pipeline_args) as p:
            rows = p | "ReadFromBQ" >> beam.io.Read(
                beam.io.BigQuerySource(query=query, use_standard_sql=True)
            )
            rows | "writeToBQ" >> beam.io.Write(
                "BQ Write"
                >> beam.io.WriteToBigQuery(
                    known_args.output_table,
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                )
            )
    

    【讨论】:

    • data_files 是关键
    【解决方案2】:

    这取决于您对要包含的文件所做的操作,但考虑到这是一个 SQL 文件(而不是本地 Python 包或非 Python 依赖项),“包含”它的一种方法是将其放入在 Google Cloud Storage 存储桶中并将其添加为参数:

    def run(argv=None): 
        parser = argparse.ArgumentParser()
        parser.add_argument(
            '--input',
            dest='input',
            default='gs://bucket/queries/query.sql',
            help='Input SQL file.'
            )
        known_args, pipeline_args = parser.parse_known_args(argv)
        pipeline_args.extend([
              '--runner=DataflowRunner',
              '--project=proj',
              '--region=region',
              '--staging_location=gs://bucket/staging/',
              '--temp_location=gs://bucket/temp/',
              '--job_name=name',
              '--setup_file=./setup.py'
              ]) 
    

    现在如果您需要将此文件用作PTransform 中的参数,您可以将known_args.input 传递给它。希望这会有所帮助

    【讨论】:

    • 这非常适用于我需要逐行解析的文本文件,但我不确定您是如何使用 SQL 文件的,因此可能会将其作为参数传递给 @987654324 @ 不适合您的用例。
    • 这是一个非常聪明的解决方案(赞成),但我认为必须有一个更简单的解决方案,不需要添加服务组件(存储)。
    【解决方案3】:

    请考虑使用filesToStage,遵循existing SO answer 中描述的模式。这将允许您提供文件。这种方法有一些“陷阱”,因此请仔细查看答案。

    不幸的是,simplest solution I found 是一个特定于 java 的解决方案。使用资源文件夹将配置文件打包到 jar 中。然后使用 java 提供的 API 将文件读回。

    【讨论】:

      猜你喜欢
      • 2021-11-09
      • 2011-06-24
      • 1970-01-01
      • 1970-01-01
      • 2018-05-22
      • 2015-07-28
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多