【问题标题】:Streaming data into Big Query将数据流式传输到 Bigquery
【发布时间】:2016-01-25 05:14:34
【问题描述】:

我正在尝试将数据从 Scala 应用程序流式传输到 BQ。查看Streaming Data Into BigQuery 中列出的示例,我发现数据需要使用TableDataInsertAllRequest.Rows().setJson() 作为Map<String, Object> 传入。

  1. 这是传入数据的唯一方法吗?
  2. 鉴于这表示将由 BQ 连接器库作为 JSON 流式传输的数据,是否可以以 JSON 化字符串格式而不是 Map<String, Object> 传递数据?如果没有,有什么原因吗?

【问题讨论】:

  • 当然。希望从 BQ 团队获得更多的答案/见解。 cloud.google.com/bigquery/bigquery-api-quickstart 表示 Google 工程师会监控此标签。
  • 他们确实做到了,但他们可能不是发布连接器库的人。实际上,您应该编辑原始问题并命名您使用的连接器库。

标签: google-bigquery


【解决方案1】:

很遗憾,通过我们(或任何 Google Cloud Platform)API 生成的库不支持直接写出请求正文。这可能有助于确保请求的有效性。也就是说,在客户端库方面有积极的工作,辅助方法似乎是一个合理的要求。出于上述验证目的,开销可能仍然存在(解析为客户端表示),但客户端接口对于您的场景会更简单一些。

我会转达您的请求。同时,这个问题的答案提到了一个似乎可以简化您的翻译工作的库:

Convert Json to Map

【讨论】:

  • 糟糕。刚刚注意到您正在使用 Scala 工作。也许存在一些等效的库,或者可能存在简单的互操作?快速谷歌搜索暗示它应该是:scala-lang.org/old/faq/4
  • 我的应用是数据推送到 BQ 之前数据管道中的最后一英里。如果需要进一步处理数据,它会将传入的 JSON 转换为模型(Scala 案例类),否则可以按原样流式传输。我相信被推送的内容是在 BQ 端而不是在客户端库中验证的(针对表模式)。此外,BQ 客户端库必须在传输之前将 Map 序列化为某种形式(如果客户端库使用 REST,则很可能是 JSON)。所以,我在想如果客户端应用程序只是充当管道,那么有一种方法可以传递 JSON 字符串可能会简化事情。会想办法解决的!
  • 您是正确的,我们根据表的模式验证数据结构。然而,我的意思只是客户端中的一般请求结构验证——格式良好的 json,格式良好的正文(因为还有其他字段,如插入 id 等)。我已经提到向我们的客户端库团队添加了一个瘦辅助方法,所以我们将看看它的去向。但是,我上面提到的库至少应该让您以最少的逻辑调用现有库 - 使用 json-to-map 库获取地图,根据请求设置地图。
【解决方案2】:

我认为您应该能够通过 gcloud-java 中的 BigQuery api 使用 TableDataWriteChannel 流式传输 json 内容。

这意味着它也应该可以在没有 gcloud-java(并直接使用 api-client)的情况下实现,尽管您可能需要重复一些代码 图书馆正在为您服务。

我强烈建议您查看gcloud-java,并随时添加feature request,以便在instertAll 操作中也支持json 内容。

【讨论】:

    【解决方案3】:

    我还建议您查看 gcloud-java 中的 BigQuery api。在 gcloud-java 中,您可以使用 TableDataWriteChannel 将数据流式传输到 BigQuery 表。 请参见以下示例(其中JSON_CONTENT 是 JSON 字符串):

    BigQuery bigquery = BigQueryOptions.defaultInstance().service();
    TableId tableId = TableId.of("dataset", "table");
    LoadConfiguration configuration = LoadConfiguration.builder(tableId)
        .formatOptions(FormatOptions.json())
        .build();
    try (TableDataWriteChannel channel = bigquery.writer(configuration)) {
      channel.write(
          ByteBuffer.wrap(JSON_CONTENT.getBytes(StandardCharsets.UTF_8)));
    } catch (IOException e) {
      // handle exception
    }
    

    TableDataWriteChannel 使用resumable upload 将数据流式传输到 BigQuery 表,这使其更适合大数据大文件。

    TableDataWriteChannel 也可用于流式传输本地文件:

    int chunkSize = 8 * 256 * 1024;
    BigQuery bigquery = BigQueryOptions.defaultInstance().service();
    LoadConfiguration configuration = LoadConfiguration.builder(tableId)
        .formatOptions(FormatOptions.json())
        .build();
    try (FileChannel fileChannel = FileChannel.open(Paths.get("file.json"))) {
      WriteChannel writeChannel = bigquery.writer(configuration);
      long position = 0;
      long written = fileChannel.transferTo(position, chunkSize, writeChannel);
      while (written > 0) {
        position += written;
        written = fileChannel.transferTo(position, chunkSize, writeChannel);
      }
      writeChannel.close();
    }
    

    有关 gcloud-java-bigquery 的其他示例,您可以查看 BigQueryExample

    【讨论】:

    • “可恢复上传”链接描述了不使用 BigQuery 流式摄取 API 的数据加载形式。对于 swish41ffl 的目的来说,这可能没问题。需要注意的几件事:1)这将使用您的表和项目的每日负载配额 2)在幕后执行加载作业,因此可能需要一些时间才能获得数据 3)与任何加载作业一样,正确使用应该涉及轮询作业状态以验证成功完成。
    • re:“适合大数据”——数据量在这里并不真正适用。使用的 API 实际上取决于对数据新鲜度、将数据收集到单个负载的能力(为了提高效率)、客户端基础设施等的要求。有趣的是,我们有用户每天通过流式 API。
    • “大数据的适用性”:也许我的陈述不清楚,但引用 docs 可恢复上传链接:“如果您正在传输大文件并且可能出现网络中断或其他一些传输失败率很高,例如,从移动客户端应用程序上传时。”
    【解决方案4】:
    1. 这是流式传输数据的唯一方法。大文件 documented here 可以批量加载,但为此您需要将文件移动到 GCS 并从那里发出导入作业。

      李>
    2. 嗯,答案是 BQ 连接器库通常会处理转换,至少它在 Java 和 PHP 上是这样工作的,因此您需要传递对象而不是字符串。

    【讨论】:

    • 我理解数据被表示为 Map 有其优势。但是,如果我将数据作为 JSON 字符串流式传输,我觉得应该能够按原样传递它。换句话说,我认为另外支持这种方法的连接器库将避免必须将 JSON 字符串转换为 Map(在 BQ 客户端应用程序中),然后再转换回 JSON(在 BQ 连接器库中)。
    • 找到连接器库作者并发出功能请求。
    • :) 问问题的第二部分的初衷是看我是否忽略了一些明显的东西,因为这似乎是一个非常基本的要求——我的意思是传递 JSON 字符串的能力。跨度>
    • 据我所知,没有一个图书馆这样做。所以这对你来说可能很明显。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-10-19
    • 1970-01-01
    • 2016-08-10
    • 2020-01-27
    • 2020-09-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多