【问题标题】:Cloud Dataflow, PubSub & Bigquery (TableRowJsonCoder) IssuesCloud Dataflow、PubSub 和 Bigquery (TableRowJsonCoder) 问题
【发布时间】:2017-06-08 09:21:31
【问题描述】:

我正在使用 Cloud Dataflow、PubSub 和 Bigquery 来读取 JSON Pubsub 消息,使用 TableRowJsonCoder 将 JSON 转换为表格,然后将它们写入 Bigquery。

我的问题在于一致性,以下代码有时会起作用。没有错误被抛出。我确定我正在正确地向 Pubsub 主题发布消息。我也确信 Dataflow 正在阅读每条消息。我已经使用 gcloud 命令行工具对此进行了测试。

gcloud beta pubsub subscriptions pull --auto-ack SUBSCRIPTION-NAME

我有两个主题订阅,一个由 Dataflow 读取,一个由我在终端读取。该代码还成功地将 JSON 数据格式化为表格格式,并将其写入我指定的数据集和表格,当感觉像这样时:(

我的假设是我并不真正了解发生了什么,并且我缺少与窗口有关的东西,每个窗口都应该是一条消息。

假设我发送了 50 条消息,数据流似乎只读取了大约一半的元素。这是我的第一个问题,这与元素被视为一定数量的字节或消息有关吗?我该如何解决这个问题?我正在使用 TableRowJSONCoder 读取数据。

然后似乎又出现了类似的问题,对于 X 元素,只有一小部分成功通过 Groupbykey。如果我能进一步排除故障,我对这个问题的描述会更深入。请注意,“id”字段始终是 unquie,所以我认为这与重复无关,但我可能是错的。

即使在我写此消息时,添加的元素已上升到 41,而 bigquery 的输出已上升到 12。我只是等待的时间不够长吗?我的测试数据是否太小(总是低于 100 条消息)?即使它最终保存了我所有的行,花费一个多小时来做这件事似乎也太长了。

dataflow console

The succesfully inserted data

/*
 * Copyright (C) 2015 Google Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package com.example;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;

import java.util.ArrayList;
import java.util.List;

import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A starter example for writing Google Cloud Dataflow programs.
 *
 * <p>The example takes two strings, converts them to their upper-case
 * representation and logs them.
 *
 * <p>To run this starter example locally using DirectPipelineRunner, just
 * execute it without any additional parameters from your favorite development
 * environment.
 *
 * <p>To run this starter example using managed resource in Google Cloud
 * Platform, you should specify the following command-line options:
 *   --project=<YOUR_PROJECT_ID>
 *   --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
 *   --runner=BlockingDataflowPipelineRunner
 */
public class StarterPipeline {

  private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

  static final int WINDOW_SIZE = 1;  // Default window duration in minutes

  private final static String PROJECT_ID = "dataflow-project";
  private final static String PUBSUB_TOPIC = "projects/dataflow-project/topics/pub-sub-topic";
  private final static String DATASET_ID = "test_dataset";
  private final static String TABLE_ID = "test_table_version_one";


  private static TableSchema getSchema() {
      List<TableFieldSchema> fields = new ArrayList<>();
      fields.add(new TableFieldSchema().setName("id").setType("STRING"));
      fields.add(new TableFieldSchema().setName("ip").setType("STRING"));
      fields.add(new TableFieldSchema().setName("installation_id").setType("STRING"));
      fields.add(new TableFieldSchema().setName("user_id").setType("STRING"));
      fields.add(new TableFieldSchema().setName("device_type").setType("STRING"));
      fields.add(new TableFieldSchema().setName("language").setType("STRING"));
      fields.add(new TableFieldSchema().setName("application_id").setType("STRING"));
      fields.add(new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"));
      TableSchema schema = new TableSchema().setFields(fields);
      return schema;
    }

  private static TableReference getTableReference() {
      TableReference tableRef = new TableReference();
      tableRef.setProjectId(PROJECT_ID);
      tableRef.setDatasetId(DATASET_ID);
      tableRef.setTableId(TABLE_ID);
      return tableRef;
    }

  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
    dataflowOptions.setStreaming(true);
    Pipeline pipeline = Pipeline.create(dataflowOptions);
    LOG.info("Reading from PubSub.");
    PCollection<TableRow> input = pipeline
        .apply(PubsubIO.Read.topic(PUBSUB_TOPIC).withCoder(TableRowJsonCoder.of()))
            .apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1))));
    input
         .apply(BigQueryIO.Write.to(getTableReference()).withSchema(getSchema()));

    pipeline.run();
  }
}

我还感兴趣的是将时间戳和记录 ID 指定为“时间戳”和“id”字段。

【问题讨论】:

  • 这确实应该快得多。如果项目中的网络设置配置错误,我之前已经看到过这种情况。您能否提供您工作的 job_id,以便我进一步调查?有关时间戳/id 问题,请参阅cloud.google.com/dataflow/model/…
  • @danielm 2017-01-23_09_48_10-1670593411236141809,请注意上面的project-id不是正确的。
  • 在让管道通宵运行后,从 Pubsub 读取中添加了 63 个元素,并生成了 17 行。瓶颈再次是 GroupByKey,以及从 Pubsub 读取所需的时间。
  • 我又运行了两个作业:2017-01-24_04_10_55-11493275089556109537:其中 504 条消息已发布到 Cloud PubSub。 2017-01-24_03_53_31-13159891042380692229:其中 100 条消息已发布到 Cloud PubSub,截至目前,这项工作正在流失。但也需要很长时间。到目前为止,我运行的每个作业的系统延迟都非常接近运行时间。
  • 作为一个控件,我运行了窗口字数示例 (2017-01-24_06_20_37-11246067058927218114)。它显示出与之前运行类似的行为。 Pubsub 没有在整个集合中阅读。当 GroupByKey 出现时,就会形成瓶颈。它现在已经运行了 10 分钟,没有任何元素退出 GroupByKey。文件注入器代码似乎有 5,525 个元素。在 windowed-wordcount 示例中,PubSub 仅读取 1,325 个元素。在批处理模式下,一切似乎都正常工作(2017-01-19_10_20_49-18373131302606291153)。

标签: java google-app-engine google-bigquery google-cloud-dataflow google-cloud-pubsub


【解决方案1】:

问题在于 GCE 虚拟机的网络配置错误。 Dataflow 要求 VM 能够通过 TCP 进行通信,而您的防火墙规则不允许这样做。添加防火墙规则以允许虚拟机之间的 TCP 连接将解决此问题。

某些数据缓慢通过您的管道的原因是因为有时您很幸运,数据只需要在一台机器上处理。 Pubsub 最终会超时并重试消息,因此它们最终都会通过。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-12-14
    • 2020-01-23
    • 1970-01-01
    • 2019-04-21
    • 2019-03-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多