【问题标题】:How to use mapreduce to bulk update datastore entities that satisfy a query?如何使用 mapreduce 批量更新满足查询的数据存储实体?
【发布时间】:2015-01-15 07:22:34
【问题描述】:

我想使用 mapreduce 库来更新所有满足查询的实体。有几个并发症:

  1. 查找要更新的实体的查询检查是否存在 特定属性“property1”包含在一个长长的值列表中(~10000 条目)来自 csv 文件
  2. 对于每个满足查询的实体,需要将另一个属性“property2”更新为等于 csv 文件第二列和同一行中的值

我知道如何将 csv 文件上传到 Blobstore 并使用 Blobstore 输入阅读器读取每一行。我也知道使用查询获取实体的 Datastore 输入阅读器。

我的问题是如何创建一个从 Blobstore 读取输入数据、获取数据存储实体并尽可能高效地更新它们的 Mapper 类?

【问题讨论】:

  • 我怀疑 map-reduce 能否提供您想要的性能。通常 map-reduce 对可以自然分解的数据进行操作;但在您的情况下,由于您正在使用 CSV 文件,因此使用的大部分时间和内存将用于将该 CSV 文件解析为多行(多个字符串);或者,如果您将该 CSV 字符串视为单个流,那么您会被 readLine() 的串行操作阻止。由于与任务划分的成本相比,您的处理并不耗时(如果不是,请纠正我),我认为 map reduce 不会在任何方面为您带来好处。
  • 我希望至少从批量获取和放入数据存储中受益,因为替代方法是获取每个完整的实体,更改属性并放回数据存储。
  • 虽然 map-reduce,MR,非常适合在许多实体上工作,但它还需要处理 csv 可能会减慢速度。一种选择是将 CSV 也加载到数据存储中,对所有实体执行 MR,然后在映射器中对 CSV 类型执行 .get 以查看传递的实体是否存在。如果是更新,则跳过。不是最好的方法,但我能想到的唯一方法。仅供参考,您可以使用 put_multi 进行批处理cloud.google.com/appengine/docs/python/ndb/functions
  • 这应该可以帮助你。 ikaisays.com/2010/08/11/…
  • @Sridhar,该教程只是根据 CSV 中的信息创建新实体。就我而言,我需要找到与 CSV 的每一行对应的现有数据存储实体并对其进行更新。那么,我应该查询数据存储区约 10,000 次以获取每行的实体还是有更好的方法?

标签: java google-app-engine mapreduce google-cloud-datastore blobstore


【解决方案1】:

鉴于 property1 的可能值列表很长,使用查询进行过滤似乎不是一个好的选择(因为您需要使用 IN 过滤器,实际上是 @987654321 @)

使用 MR 的另一种方法是使用 Map(从 property1property2)将 CSV 加载到内存中,然后触发迭代所有实体的 MR 作业,如果他们的 property1 是 Map 上的 Keys 的一部分,则使用映射的值对其进行修改。

正如@Ryan B 所说,如果您只想利用批量放置,则不需要为此使用 MR,因为您可以使用 DatastoreService 使用 Iterableput

【讨论】:

  • 谢谢!我同意对这么长的列表使用带有 IN 过滤器的查询将是非常低效的。我有几个简单的问题: 1. 您知道将 CSV 加载到地图中的最佳方式吗? 2. 遍历map的key并从datastore中获取对应的entity,而不是遍历所有datastore的实体来查看它们是否在map中不是更好吗? 3. 如果我只是使用batch puts,我怎么知道哪些操作成功完成了?
  • 1.取决于工作:如果它是一次性的,您甚至可能希望以编程方式加载(即使用脚本加载 CSV 并输出填充地图的 java 代码)。如果没有,请使用 Blobstore,获取文件并在后台任务上迭代(并可能将其存储在 Memcache 中) 2. 好吧,这取决于您的地图与实体的关系有多稀疏,以及您是否拥有和索引超过 property1 与否……但是,是的,另一个选择是。 3.阅读put关于事务管理的文档:你可以决定提交/回滚或者让默认配置工作。
【解决方案2】:

您可以使用 DatastoreInputReader,并在 map 函数中,确定 property1 是否实际上在 csv 中:每次从 csv 读取会很慢,您可以做的是使用 memcache 在它之后提供该信息从它自己的 Datastore 模型中只读取一次。要填充数据存储模型,我建议使用 property1 作为每行的自定义 Id,这样,查询它就很简单了。您只会为那些实际更改的值更新数据存储区,并使用突变池使其具有性能(op.db.Put())。我给你留下伪代码(对不起......我只有在 python 中)不同部分的外观,我进一步建议你阅读这篇关于 Google App Engine 上 Mapreduce 的文章:http://sookocheff.com/posts/2014-04-15-app-engine-mapreduce-api-part-1-the-basics/

#to get the to_dict method
from google.appengine.ext import ndb
from mapreduce import operation as op 
from mapreduce.lib import pipeline
from mapreduce import mapreduce_pipeline

class TouchPipeline(pipeline.Pipeline):
    """
    Pipeline to update the field of entities that have certain condition
    """

    def run(self, *args, **kwargs):
        """ run """
        mapper_params = {
            "entity_kind": "yourDatastoreKind",
        }
        yield mapreduce_pipeline.MapperPipeline(
            "Update entities that have certain condition",
            handler_spec="datastore_map",
            input_reader_spec="mapreduce.input_readers.DatastoreInputReader",
            params=mapper_params,
            shards=64)


class csvrow(ndb.Model):
  #you dont store property 1 because you are going to use its value as key
  substitutefield=ndb.StringProperty()

def create_csv_datastore():
  # instead of running this, make a 10,000 row function with each csv value, 
  # or read it from the blobstore, iterate and update the values accordingly
  for i in range(10000):
    #here we are using our own key as id of this row and just storing the other column that
    #eventually will be subtitute if it matches
    csvrow.get_or_insert('property%s' % i, substitutefield = 'substitute%s').put()


def queryfromcsv(property1):
  csvrow=ndb.Key('csvrow', property1).get()
  if csvrow:
    return csvrow.substitutefield
  else:
    return property1

def property1InCSV(property1):
  data = memcache.get(property1)
  if data is not None:
      return data
  else:
      data = self.queryfromcsv(property1)
      memcache.add(property1, data, 60)
      return data

def datastore_map(entity_type):
  datastorepropertytocheck = entity_type.property1
  newvalue = property1InCSV(datastorepropertytocheck)
  if newvalue!=datastoreproperty:
    entity_type.property11 = newvalue
    #use the mutation pool
    yield op.db.Put(entity)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2022-01-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-09-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多