【问题标题】:Streaming dataflow from Google Cloud Storage to Big Query将数据流从 Google Cloud Storage 流式传输到 Big Query
【发布时间】:2018-06-02 19:54:14
【问题描述】:

我正在尝试使用 DataFlow (Java) 将数据从 Cloud Storage 插入到 Big Query。我可以批量上传数据;但是,我想改为设置流式上传。因此,当新对象添加到我的存储桶时,它们将被推送到 BigQuery。

我已将 PipelineOptions 设置为 Streaming,它在 GCP Console UI 中显示数据流管道属于流式传输类型。我在存储桶中的初始文件/对象集被推送到 BigQuery。

但是当我向存储桶添加新对象时,这些对象不会被推送到 BigQuery。这是为什么?如何使用流式数据流管道将添加到我的 Cloud Storage 的对象推送到 BigQuery?

//Specify PipelineOptions
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);


  options.setProject(<project-name>);
  options.setStagingLocation(<bucket/staging folder>);    
  options.setStreaming(true);
  options.setRunner(DataflowRunner.class);

我的解释是,因为这是一个流式管道,所以当我将对象添加到 Cloud Storage 时,它​​们会被推送到 BigQuery。

请提出建议。

【问题讨论】:

标签: google-cloud-platform google-bigquery google-cloud-storage google-cloud-dataflow


【解决方案1】:

您如何创建输入集合?您需要有一个无限制的输入才能使流式传输管道继续运行,否则它将只是暂时的(但将使用流式插入)。 您可以通过读取包含存储桶中所有更改的订阅来实现此目的,请参阅https://cloud.google.com/storage/docs/pubsub-notifications 了解详细信息。

【讨论】:

  • 感谢您的回复。输入将是我或其他人定期上传的文件。我在想,既然我已经创建了一个流式传输管道,它只会从云存储中获取任何输入,并通过流式传输数据管道将其推送到 Pub/sub。从那里,另一个数据管道会将其传输到 BigQuery。但我明白你的意思——因为我定期手动将文件上传到云存储——它代表了一个“有界”输入。
  • 作为替代架构 - 当 Cloud Storage 存储桶发生任何更改时,我可以使用 Cloud Server 功能创建数据流管道吗?这样云服务器功能 - 数据流管道将把数据传送到 Pub/sub。从那里另一个流数据流管道将把它带到 Big Query?例如:codelabs.developers.google.com/codelabs/iot-data-pipeline/… 请参见步骤 #7。
  • 通知配置将对象元数据发送到 PubSub。如果我想将实际对象数据推送到 PubSub 怎么办?我的用例是我需要获取对象/文件,读取每一行,解析它,进行一些转换,然后将其推送到 Big Query。
  • 我注意到在 Apache Beam 2.2 中您可以查看新文件 -
猜你喜欢
  • 2020-07-09
  • 2017-12-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-01-15
  • 2020-02-23
  • 2021-11-05
  • 1970-01-01
相关资源
最近更新 更多