【问题标题】:Python Apache Beam Google Storage Write errorPython Apache Beam Google 存储写入错误
【发布时间】:2017-10-06 10:55:38
【问题描述】:

我正在尝试将我的管道响应写入 Google 存储,但收到已安装在服务器上的模块导入错误。

代码:

from __future__ import print_function, absolute_import
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.transforms import PTransform, ParDo, DoFn, Create
from apache_beam.io import iobase, range_trackers
import logging
import re
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)

def mongo_connection_string(url):
    import logging
    logger = logging.getLogger(__name__)
if 'gs://' in url:
    from google.cloud import storage
    logging.info('Fetching connection string from Cloud Storage {}'.format(url))
    _, path = url.split('gs://')
    path = path.split('/')
    bucket = path[0]
    path = '/'.join(path[1:])
    client = storage.Client()
    blob = client.get_bucket(bucket).get_blob(path).download_as_string()
    connection_string = blob.splitlines()[0]
    return connection_string
logger.info('Using connection string from CLI options')
return url

iso_match = re.compile(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}')

def clean_query(query):
    new_query = {}
    for key, val in query.iteritems():
        if isinstance(val, basestring):
            val = str(val)
    if isinstance(val, basestring) and iso_match.match(val):
        val = datetime.datetime.strptime(val[0:19], '%Y-%m-%dT%H:%M:%S')
    elif isinstance(val, dict):
        val = clean_query(val)
    new_query[str(key)] = val
return new_query

class _MongoSource(iobase.BoundedSource):
   import pymongo
   def __init__(self, connection_string, db, collection, query=None, fields=None):
    import logging
    logger = logging.getLogger(__name__)
    self._connection_string = connection_string
    self._db = db
    self._collection = collection
    self._fields = fields
    self._client = None

    # Prepare query
    self._query = query
    if not self._query:
        self._query = {}
    logger.info('Raw query: {}'.format(query))
    self._query = clean_query(self._query)
    logger.info('Cleaned query: {}'.format(self._query))

@property
def client(self):
    import logging
    import pymongo
    logger = logging.getLogger(__name__)
    if self._client:
        logger.info('Reusing existing PyMongo client')
        return self._client
    logger.info('Preparing new PyMongo client')
    self._client = pymongo.MongoClient(self._connection_string)
    return self._client

def estimate_size(self):
    return self.client[self._db][self._collection].count(self._query)

def get_range_tracker(self, start_position, stop_position):
    from apache_beam.io import iobase, range_trackers
    if start_position is None:
        start_position = 0
    if stop_position is None:
        stop_position = range_trackers.OffsetRangeTracker.OFFSET_INFINITY
    range_tracker = range_trackers.OffsetRangeTracker(start_position, stop_position)
    range_tracker = range_trackers.UnsplittableRangeTracker(range_tracker)

    return range_tracker

def read(self, range_tracker):
    coll = self.client[self._db][self._collection]
    for doc in coll.find(self._query, projection=self._fields):
        yield doc

def split(self, desired_bundle_size, start_position=None, stop_position=None):
    from apache_beam.io import iobase, range_trackers
    if start_position is None:
        start_position = 0
    if stop_position is None:
        stop_position = range_trackers.OffsetRangeTracker.OFFSET_INFINITY
    yield iobase.SourceBundle(
        weight=1,
        source=self,
        start_position=start_position,
        stop_position=stop_position)


class ReadFromMongo(PTransform):
    def __init__(self, connection_string, db, collection, query=None, fields=None):
        super(ReadFromMongo, self).__init__()
        self._connection_string = connection_string
        self._db = db
        self._collection = collection
        self._query = query
        self._fields = fields
        self._source = _MongoSource(
        self._connection_string,
        self._db,
        self._collection,
        query=self._query,
        fields=self._fields)

def expand(self, pcoll):
    import logging
    logger = logging.getLogger(__name__)
    logger.info('Starting MongoDB read from {}.{} with query {}'
                .format(self._db, self._collection, self._query))
    return pcoll | iobase.Read(self._source)

def display_data(self):
    return {'source_dd': self._source}


def transform_doc(document):
    data={str(document['clause type']):int(document['count'])}
    return data
def run():
    import time
    parser = argparse.ArgumentParser()
    parser.add_argument('--output',
                  dest='output',
                  default='<output path>',
                  help='Output file to write results to.')
    known_args, pipeline_args = parser.parse_known_args()
    gcs_path = "<gcs URL>"
    project_name = "<project name>"
    pipeline_args.extend(['--runner=DataflowRunner', 
    "--project=civic-eye-181513",
    "--staging_location=<stagging location>",
    "--temp_location=<temp location>"
    ])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as pipeline:
    print ("starting pipleline")
    connection_string = '<mongo URL>'
    (pipeline
     | "Load" >> ReadFromMongo(connection_string, 'new', 'Data', query={}, fields=['clause type','count'])
     | "transform" >> beam.Map(transform_doc).with_output_types(str)
     | "Save" >> WriteToText("{0}/output/wordcount{1}".format(gcs_path,int(time.time()))))
print ("done")     

if __name__ == '__main__':
    run()

错误:

Exception in worker loop: Traceback (most recent call last):
 File  "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 738, in run work,
  execution_context, env=self.environment)
 File  "/usr/local/lib/python2.7/dist-packages/dataflow_worker/workitem.py",  line 130, in get_work_items
  work_item_proto.sourceOperationTask.split)
 File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/workercustomsources.py", line 142, in  __init__
  source_spec[names.SERIALIZED_SOURCE_KEY]['value'])
 File  "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads return 
  dill.loads(s)
 File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
  return load(file)
 File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
  obj = pik.load()
 File "/usr/lib/python2.7/pickle.py", line 858, in load
  dispatch[key](self)
 File "/usr/lib/python2.7/pickle.py", line 1133, in load_reduce
  value = func(*args)
 File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 766, in _import_module
  return __import__(import_name)
ImportError: No module named pymongo

注意:Pymongo 模块已经安装了最新版本:

pip show pymongo    
Name: pymongo    
Version: 3.5.1    
Summary: Python driver for MongoDB <http://www.mongodb.org>    
Home-page: http://github.com/mongodb/mongo-python-driver    
Author: Bernie Hackett    
Author-email: bernie@mongodb.com    
License: Apache License, Version 2.0    
Location: /usr/local/lib/python2.7/dist-packages

谢谢

【问题讨论】:

    标签: python mongodb google-cloud-storage pymongo apache-beam


    【解决方案1】:

    当您使用一些非默认 python 库时,例如您自己的 utils 库或 pypi 的一些依赖项,那么您需要提供 requirementssetup 文件。您可以在this link查看有关它的详细信息

    这样做的原因是,当您向数据流提交作业时,您的代码实际上是在数据流服务为您启动的不同计算引擎上运行的。您使用的所有依赖项都需要安装在它们上。这可以通过提供requirementssetup 文件来实现。

    由于您使用的是 pypi 依赖项,因此您需要做的就是

    1. 通过执行pip freeze&gt;requirements.txt 创建需求文件
    2. 将该需求文件提供给管道选项

    要提供需求文件,请使用以下代码提供参数

    requirements_file = "/path/to/requirements_file"
    pipeline_options.view_as(SetupOptions).requirements_file = requirements_file
    

    所以你的运行函数应该如下所示

    def run():
        import time
        parser = argparse.ArgumentParser()
        parser.add_argument('--output',
                      dest='output',
                      default='<output path>',
                      help='Output file to write results to.')
        known_args, pipeline_args = parser.parse_known_args()
        gcs_path = "<gcs URL>"
        project_name = "<project name>"
        pipeline_args.extend(['--runner=DataflowRunner', 
        "--project=civic-eye-181513",
        "--staging_location=<stagging location>",
        "--temp_location=<temp location>"
        ])
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    requirements_file = "/path/to/requirements_file"
    pipeline_options.view_as(SetupOptions).requirements_file = requirements_file
    with beam.Pipeline(options=pipeline_options) as pipeline:
        print ("starting pipleline")
        connection_string = '<mongo URL>'
        (pipeline
         | "Load" >> ReadFromMongo(connection_string, 'new', 'Data', query={}, fields=['clause type','count'])
         | "transform" >> beam.Map(transform_doc).with_output_types(str)
         | "Save" >> WriteToText(" {0}/output/wordcount{1}".format(gcs_path,int(time.time()))))
    print ("done")
    

    如果您需要使用一些自定义编写的 python 包,例如您自己的 utils 文件,那么您需要做的就是使用setuptools 创建安装文件,并以与requirements 文件类似的方式提供它。

    您可以在this link 阅读有关setuptools 的信息

    【讨论】:

    • 现在出现此错误:数据流似乎卡住了。请通过 stackoverflow.com/questions/tagged/google-cloud-dataflow 与 Dataflow 团队联系。
    • 您使用的 Apache Beam 是什么版本的?
    • Apache-beam 2.1.0 有一个已知问题,它安装了与它不兼容的包 six 版本 1.11。您需要将软件包降级到 1.10 版。显然 apache-beam 2.1.1 会处理它。
    • 我使用的是2.1.1版本
    • 查看 Google Cloud Logging 上的 worker-startup 日志以了解特定的数据流作业
    猜你喜欢
    • 2019-03-29
    • 2017-07-13
    • 2022-07-14
    • 2018-10-17
    • 2020-06-15
    • 2021-03-25
    • 1970-01-01
    • 1970-01-01
    • 2022-08-19
    相关资源
    最近更新 更多