【问题标题】:Import CSV into google cloud datastore将 CSV 导入谷歌云数据存储
【发布时间】:2018-01-27 05:50:04
【问题描述】:

我有一个包含 2 列和 20,000 行的 CSV 文件,我想将其导入 Google Cloud Datastore。我是 Google Cloud 和 NoSQL 数据库的新手。我曾尝试使用数据流,但需要提供 Javascript UDF 函数名称。有人有这方面的例子吗?一旦它在数据存储中,我将查询这些数据。 任何有关如何创建它的建议或指导将不胜感激。

【问题讨论】:

    标签: csv google-app-engine google-cloud-datastore google-cloud-dataflow


    【解决方案1】:

    使用 Apache Beam,您可以使用 TextIO 类读取 CSV 文件。请参阅TextIO 文档。

    Pipeline p = Pipeline.create();
    
    p.apply(TextIO.read().from("gs://path/to/file.csv"));
    

    接下来,应用将解析 CSV 文件中的每一行并返回 Entity 对象的转换。根据您希望如何存储每一行​​,构造适当的Entity 对象。 This page 有一个如何创建Entity 对象的示例。

    .apply(ParDo.of(new DoFn<String, Entity>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String row = c.element();
            // TODO: parse row (split) and construct Entity object
            Entity entity = ...
            c.output(entity);
        }
    }));
    

    最后,将 Entity 对象写入 Cloud Datastore。请参阅DatastoreIO 文档。

    .apply(DatastoreIO.v1().write().withProjectId(projectId));
    

    【讨论】:

    • 谢谢安德鲁。我有一个轿跑车菜鸟问题。我查看了 TextIO 文档并有一个问题。我将在哪里运行 TextIO?在 Apache Beam 中还是在 Dataflow 中?另外,我将在哪里应用转换并将实体写入 Cloud Datastore?我看到我可以在 Dataflow 中运行作业。你指的是这个吗?
    • Apache Beam 是一种用于定义管道的编程模型。管道可以在 Cloud Dataflow 等执行引擎上运行。你实际上并没有“运行”TextIO。您使用 Apache Beam SDK 定义管道。例如,使用 Java SDK,TextIO.Read 是输入转换,DatastoreV1.Read 是输出转换。您可以在两者之间应用任何转换来实现 ETL 逻辑。一旦定义/实现了管道,就可以部署/运行它。
    【解决方案2】:

    在 python 中很简单,但可以很容易地适应其他语言。使用split() 方法循环遍历行和逗号分隔的值:

    from google.appengine.api import urlfetch
    from my.models import MyModel
    
    csv_string   = 'http://someplace.com/myFile.csv'
    csv_response = urlfetch.fetch(csv_string, allow_truncated=True) 
    
    if csv_response.status_code == 200:
        for row in csv_response.content.split('\n'):
            row_values = row.split(',')
            # csv values are strings.  Cast them if they need to be something else
            new_entry = MyModel(
                property1 = row_values[0],
                property2 = row_values[1]
            )
            new_entry.put()
    
    else:
        print 'cannot load file: {}'.format(csv_string)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-06-26
      • 1970-01-01
      • 2022-08-16
      • 2015-07-17
      • 2017-09-26
      • 1970-01-01
      • 1970-01-01
      • 2017-05-11
      相关资源
      最近更新 更多