【问题标题】:Apache Beam Google Datastore ReadFromDatastore entity protobufApache Beam Google Datastore ReadFromDatastore 实体 protobuf
【发布时间】:2017-05-22 16:15:29
【问题描述】:

我正在尝试使用 apache beam 的 google 数据存储 api 来读取数据存储

p = beam.Pipeline(options=options)
(p
 | 'Read from Datastore' >> ReadFromDatastore(gcloud_options.project, query)
 | 'reformat'            >> beam.Map(reformat)
 | 'Write To Datastore'  >> WriteToDatastore(gcloud_options.project))

传递给我的重新格式化函数的对象是类型

google.cloud.proto.datastore.v1.entity_pb2.Entity

它是protobuf格式,难以修改或阅读。

我想我可以将 entity_pb2.Entity 转换为 dict

entity= dict(google.cloud.datastore.helpers._property_tuples(entity_pb))

但由于某种原因尝试导入以下两个库给了我一些错误:

import google.cloud.datastore.helpers  
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore 

错误:

Traceback (most recent call last):
  File "/home/nburn42/MotoGarage/MotoGarage/MotoGarageBackgroundJobs/format_data.py", line 16, in <module>
    import google.cloud.datastore.helpers
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/__init__.py", line 57, in <module>
    from google.cloud.datastore.batch import Batch
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/batch.py", line 24, in <module>
    from google.cloud.datastore import helpers
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/helpers.py", line 29, in <module>
    from google.cloud.grpc.datastore.v1 import entity_pb2 as _entity_pb2
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/grpc/datastore/v1/entity_pb2.py", line 28, in <module>
    dependencies=[google_dot_api_dot_annotations__pb2.DESCRIPTOR,google_dot_protobuf_dot_struct__pb2.DESCRIPTOR,google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,google_dot_type_dot_latlng__pb2.DESCRIPTOR,])
  File "/usr/local/lib/python2.7/dist-packages/google/protobuf/descriptor.py", line 824, in __new__
    return _message.default_pool.AddSerializedFile(serialized_pb)
TypeError: Couldn't build proto file into descriptor pool!
Invalid proto descriptor for file "google/cloud/grpc/datastore/v1/entity.proto":
  google.datastore.v1.PartitionId.project_id: "google.datastore.v1.PartitionId.project_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.PartitionId.namespace_id: "google.datastore.v1.PartitionId.namespace_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.PartitionId: "google.datastore.v1.PartitionId" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.partition_id: "google.datastore.v1.Key.partition_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.path: "google.datastore.v1.Key.path" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement.id_type: "google.datastore.v1.Key.PathElement.id_type" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement.kind: "google.datastore.v1.Key.PathElement.kind" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement.id: "google.datastore.v1.Key.PathElement.id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement.name: "google.datastore.v1.Key.PathElement.name" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement: "google.datastore.v1.Key.PathElement" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key: "google.datastore.v1.Key" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.ArrayValue.values: "google.datastore.v1.ArrayValue.values" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.ArrayValue: "google.datastore.v1.ArrayValue" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.value_type: "google.datastore.v1.Value.value_type" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.null_value: "google.datastore.v1.Value.null_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.boolean_value: "google.datastore.v1.Value.boolean_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.integer_value: "google.datastore.v1.Value.integer_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.double_value: "google.datastore.v1.Value.double_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.timestamp_value: "google.datastore.v1.Value.timestamp_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.key_value: "google.datastore.v1.Value.key_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.string_value: "google.datastore.v1.Value.string_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.blob_value: "google.datastore.v1.Value.blob_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.geo_point_value: "google.datastore.v1.Value.geo_point_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.entity_value: "google.datastore.v1.Value.entity_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.array_value: "google.datastore.v1.Value.array_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.meaning: "google.datastore.v1.Value.meaning" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.exclude_from_indexes: "google.datastore.v1.Value.exclude_from_indexes" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value: "google.datastore.v1.Value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.key: "google.datastore.v1.Entity.key" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.properties" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry.key: "google.datastore.v1.Entity.PropertiesEntry.key" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Entity.PropertiesEntry.value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry: "google.datastore.v1.Entity.PropertiesEntry" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity: "google.datastore.v1.Entity" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.partition_id: "google.datastore.v1.PartitionId" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/grpc/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Key.path: "google.datastore.v1.Key.PathElement" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/grpc/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.ArrayValue.values: "google.datastore.v1.Value" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/grpc/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.key_value: "google.datastore.v1.Key" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/grpc/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.entity_value: "google.datastore.v1.Entity" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/grpc/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.array_value: "google.datastore.v1.ArrayValue" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/grpc/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Value" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/grpc/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.key: "google.datastore.v1.Key" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/grpc/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.PropertiesEntry" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/grpc/datastore/v1/entity.proto".  To use it here, please add the necessary import.

我可以做些什么来将 entity_pb2.Entity 转换为可用的东西吗?
ReadFromDatastore 现在是否太新而无法真正使用?
我应该使用另一种方法吗?

谢谢,
内森

【问题讨论】:

  • 尝试查看com.google.datastore.v1 包并

标签: python google-cloud-datastore protocol-buffers google-cloud-dataflow apache-beam


【解决方案1】:

我遇到了同样的问题,accepted answer 对我不起作用。

OP 有 3 个问题:

1.我可以做些什么来将 entity_pb2.Entity 转换为可用的东西吗?

您没有具体说明在使用返回值时遇到了什么困难,但 entity_pb2.Entity 的所有实例都应该具有 properties 属性。然后,您应该能够使用它从您的实体中获取值。例如property_value = entity.properties.get('&lt;your_property_name&gt;')


更新:我想我现在可能知道 OP 的“可用”是什么意思,因为即使您执行 property_value = entity.properties.get('&lt;your_property_name&gt;'),您在 property_value 中获得的值也是协议缓冲区格式。 ..所以要获得属性的字典,你可以这样做......

from googledatastore import helper

value_dict = dict((prop_name, helper.get_value(entity.properties.get(prop_name)),) for prop_name in entity.properties)

2。 ReadFromDatastore 是否太新而无法真正使用?

我最初也是这么想的,但我现在似乎可以正常工作了(请参阅下面对 Q3 的回答)。

3.我应该使用其他方法吗?

您绝对不能将google-cloud-datastore 库导入您的项目。这样做会导致在您从 apache_beam 导入 ReadFromDatastore 时出现原始问题中的 TypeError: Couldn't build proto file into descriptor pool! 错误。

从我一直在做的调查/调试看来,apache-beam (v2.8.0) 库的当前版本与google-cloud-datastore (v1.7.1) 库根本不兼容。这意味着我们必须改为使用 bundled googledatastore (v7.0.1) 库来实现我们想要的。

进一步阅读/参考:

https://cloud.google.com/blog/products/gcp/how-to-do-data-processing-and-analytics-from-google-app-engine-with-google-cloud-dataflow

https://github.com/amygdala/gae-dataflow

https://gcloud-python.readthedocs.io/en/0.10.0/_modules/gcloud/datastore/helpers.html

【讨论】:

  • google-cloud-datastore 和 googledatastore 似乎可以在一定程度上共存,如果添加 ['pip uninstall -y protobuf', 'pip install protobuf==3.7.1 --no- binary=protobuf'] 到 setup.py 中的 custom_commands 部分。
  • 作为一个额外的 FYI,可以通过 element.key.path[-1].id 访问该 ID
【解决方案2】:

另一种(更简单的)指定查询的方法如下:

from google.cloud import datastore
from google.cloud.datastore import query as datastore_query
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore

p = beam.Pipeline(options=pipeline_options)
ds_client = datastore.Client(project=project)
query = ds_client.query(kind=kind)
# possible filter: query.add_filter('column','operator',criteria) 
# query.add_filter('age','>',18)
# query.add_filter('name','=',"John")
query = datastore_query._pb_from_query(query)

p | 'ReadFromDatastore' >> ReadFromDatastore(project=project, query=query)
p.run().wait_until_finish()

将作业传输到 DataflowRunner(在云中)时,请确保您的本地要求与您传输到谷歌云的 setup.py 文件一致。我经历过您必须在本地计算机上安装 apache beam 2.1.0,然后在 setup.py 文件中指定相同的版本才能使其在云工作者上工作。

【讨论】:

  • 特别是对我来说,我的 setup.py google-cloud-datastore 没有版本化,但我的本地安装有一个较旧的固定版本。这导致在云上运行时发生冲突。固定两个版本为我修复了它,谢谢!
【解决方案3】:

您可以使用google.cloud.datastore.helpers.entity_from_protobuf函数将entity_pb2.Entity转换为google.cloud.datastore.entity.Entity

google.cloud.datastore.entity.Entity 是 dict 的子类,可为您提供所需的可用性。

【讨论】:

  • 这个解决方案非常适合在本地机器上运行的 Apache Beam。但是,当将作业推送到 DataflowRunner 时,作业失败,说在 google.cloud 中找不到 datastore。这是因为 Datastore 对 Apache Beam Python SDK 的支持仍处于测试阶段吗?
  • 当您使用 Apache Beam 将管道部署到 Google Dataflow 工作器时,您的 Python 环境不会在工作器计算机上复制。问题是工作机器没有默认安装 google-cloud-datastore 包,而您的本地环境有。有关指定 PyPI 依赖项的说明,请参阅:cloud.google.com/dataflow/pipelines/dependencies-python
【解决方案4】:

最新版本的 Apache Beam 2.13 弃用了这种使用旧 googledatastore 库的旧方法,并添加了一个使用更新且更人性化的 google-cloud-datastore 库的新实现。

https://beam.apache.org/releases/pydoc/2.13.0/apache_beam.io.gcp.datastore.v1new.datastoreio.html

https://github.com/apache/beam/pull/8262

添加示例仍有待解决的问题,因此现在您必须弄清楚这部分。

https://issues.apache.org/jira/browse/BEAM-7350

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-10-19
    • 2019-05-12
    • 2015-05-30
    • 2019-08-07
    • 2018-09-27
    • 1970-01-01
    • 1970-01-01
    • 2021-09-18
    相关资源
    最近更新 更多