【问题标题】:How to best handle data stored in different locations in Google BigQuery?如何最好地处理存储在 Google BigQuery 中不同位置的数据?
【发布时间】:2015-12-22 09:42:37
【问题描述】:

我目前在 BigQuery 中的工作流程如下:

(1) 查询公共存储库中的数据(存储在美国),(2) 将其写入我的存储库中的表,(3) 将 csv 导出到云存储桶和 (4) 将 csv 下载到我工作的服务器和(5)在服务器上使用它。

我现在遇到的问题是我工作的服务器位于欧盟。因此,我必须为在我的美国存储桶和我的欧盟服务器之间传输数据支付相当多的费用。我现在可以继续在欧盟找到我的存储桶,但我仍然遇到将数据从美国(BigQuery)传输到欧盟(存储桶)的问题。所以我也可以将我在 bq 中的数据集设置为位于欧盟,但是我不能再进行任何查询,因为公共存储库中的数据位于美国,并且不允许在不同位置之间进行查询。

有人知道如何解决这个问题吗?

【问题讨论】:

  • 数据量是多少,您想避免支付的费用大概是多少?
  • 我最终每次需要传输的csv数据量在3GB左右。但是,我经常这样做。每天总计约 1 美元。
  • 其实你在这里付多少钱?
  • 好吧,我支付此处列出的“洲际转账”:cloud.google.com/storage/pricing 我的每日金额取决于我进行的查询次数;我例如昨天有 2 美元。
  • 从无名公司获得便宜的虚拟机,但服务器位于美国,没有这样的定价,并将其用作中间步骤,这不是更容易吗?

标签: google-bigquery google-cloud-storage google-cloud-datastore


【解决方案1】:

将 BigQuery 数据集从一个区域复制到另一个区域的一种方法是利用 Storage Data Transfer Service。它没有解决您仍然必须pay for bucket-to-bucket network traffic 的事实,但可能会节省一些将数据复制到欧盟服务器的 CPU 时间。

流程将是:

  1. 将所有 BigQuery 表提取到与表位于同一区域的存储桶中。 (推荐 Avro 格式以获得最佳的数据类型保真度和最快的加载速度。)
  2. 运行存储传输作业​​,将提取的文件从起始位置存储桶复制到目标位置的存储桶。
  3. 将所有文件加载到位于目标位置的 BigQuery 数据集中。

Python 示例:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import sys
import time

import googleapiclient.discovery
from google.cloud import bigquery
import json
import pytz


PROJECT_ID = 'swast-scratch'  # TODO: set this to your project name
FROM_LOCATION = 'US'  # TODO: set this to the BigQuery location
FROM_DATASET = 'workflow_test_us'  # TODO: set to BQ dataset name
FROM_BUCKET = 'swast-scratch-us'  # TODO: set to bucket name in same location
TO_LOCATION = 'EU'  # TODO: set this to the destination BigQuery location
TO_DATASET = 'workflow_test_eu'  # TODO: set to destination dataset name
TO_BUCKET = 'swast-scratch-eu'  # TODO: set to bucket name in destination loc

# Construct API clients.
bq_client = bigquery.Client(project=PROJECT_ID)
transfer_client = googleapiclient.discovery.build('storagetransfer', 'v1')


def extract_tables():
    # Extract all tables in a dataset to a Cloud Storage bucket.
    print('Extracting {}:{} to bucket {}'.format(
        PROJECT_ID, FROM_DATASET, FROM_BUCKET))

    tables = list(bq_client.list_tables(bq_client.dataset(FROM_DATASET)))
    extract_jobs = []
    for table in tables:
        job_config = bigquery.ExtractJobConfig()
        job_config.destination_format = bigquery.DestinationFormat.AVRO
        extract_job = bq_client.extract_table(
            table.reference,
            ['gs://{}/{}.avro'.format(FROM_BUCKET, table.table_id)],
            location=FROM_LOCATION,  # Available in 0.32.0 library.
            job_config=job_config)  # Starts the extract job.
        extract_jobs.append(extract_job)

    for job in extract_jobs:
        job.result()

    return tables


def transfer_buckets():
    # Transfer files from one region to another using storage transfer service.
    print('Transferring bucket {} to {}'.format(FROM_BUCKET, TO_BUCKET))
    now = datetime.datetime.now(pytz.utc)
    transfer_job = {
        'description': '{}-{}-{}_once'.format(
            PROJECT_ID, FROM_BUCKET, TO_BUCKET),
        'status': 'ENABLED',
        'projectId': PROJECT_ID,
        'transferSpec': {
            'transferOptions': {
                'overwriteObjectsAlreadyExistingInSink': True,
            },
            'gcsDataSource': {
                'bucketName': FROM_BUCKET,
            },
            'gcsDataSink': {
                'bucketName': TO_BUCKET,
            },
        },
        # Set start and end date to today (UTC) without a time part to start
        # the job immediately.
        'schedule': {
            'scheduleStartDate': {
                'year': now.year,
                'month': now.month,
                'day': now.day,
            },
            'scheduleEndDate': {
                'year': now.year,
                'month': now.month,
                'day': now.day,
            },
        },
    }
    transfer_job = transfer_client.transferJobs().create(
        body=transfer_job).execute()
    print('Returned transferJob: {}'.format(
        json.dumps(transfer_job, indent=4)))

    # Find the operation created for the job.
    job_filter = {
        'project_id': PROJECT_ID,
        'job_names': [transfer_job['name']],
    }

    # Wait until the operation has started.
    response = {}
    while ('operations' not in response) or (not response['operations']):
        time.sleep(1)
        response = transfer_client.transferOperations().list(
            name='transferOperations', filter=json.dumps(job_filter)).execute()

    operation = response['operations'][0]
    print('Returned transferOperation: {}'.format(
        json.dumps(operation, indent=4)))

    # Wait for the transfer to complete.
    print('Waiting ', end='')
    while operation['metadata']['status'] == 'IN_PROGRESS':
        print('.', end='')
        sys.stdout.flush()
        time.sleep(5)
        operation = transfer_client.transferOperations().get(
            name=operation['name']).execute()
    print()

    print('Finished transferOperation: {}'.format(
        json.dumps(operation, indent=4)))


def load_tables(tables):
    # Load all tables into the new dataset.
    print('Loading tables from bucket {} to {}:{}'.format(
        TO_BUCKET, PROJECT_ID, TO_DATASET))

    load_jobs = []
    for table in tables:
        dest_table = bq_client.dataset(TO_DATASET).table(table.table_id)
        job_config = bigquery.LoadJobConfig()
        job_config.source_format = bigquery.SourceFormat.AVRO
        load_job = bq_client.load_table_from_uri(
            ['gs://{}/{}.avro'.format(TO_BUCKET, table.table_id)],
            dest_table,
            location=TO_LOCATION,  # Available in 0.32.0 library.
            job_config=job_config)  # Starts the load job.
        load_jobs.append(load_job)

    for job in load_jobs:
        job.result()


# Actually run the script.
tables = extract_tables()
transfer_buckets()
load_tables(tables)

上述示例将 google-cloud-bigquery 库用于 BigQuery API,将 google-api-python-client 用于存储数据传输 API。

请注意,此示例不考虑分区表。

【讨论】:

【解决方案2】:

无论如何,你在美国有你在欧盟需要的数据,所以我认为你有两个选择:

  1. 您可以继续支付许多较小的费用,将减少的数据集从美国转移到欧盟。

  2. 您可以支付一次性费用,将原始公共 BQ 数据集从美国传输到您自己在欧盟的数据集。从那时起,您运行的所有查询都将停留在同一区域,并且您不再需要跨大陆转移。

这实际上取决于您计划执行多少查询。如果不是很多,那么你今天做事的方式似乎是最有效的。如果它很多,那么移动一次数据(支付前期费用)可能会更便宜。

也许 Google 有一些神奇的方法可以让这一切变得更好,但据我所知,您在大西洋的一侧处理大量数据,而您需要在另一侧处理这些数据,并将其移过该线路花钱。

【讨论】:

    猜你喜欢
    • 2010-10-01
    • 1970-01-01
    • 2023-03-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-04-03
    • 2010-09-22
    • 2016-11-26
    相关资源
    最近更新 更多