【问题标题】:Export BigQuery Data to CSV without using Google Cloud Storage不使用 Google Cloud Storage 将 BigQuery 数据导出为 CSV
【发布时间】:2015-10-17 12:36:20
【问题描述】:

我目前正在编写一个软件,用于导出大量 BigQuery 数据并将查询结果存储为本地 CSV 文件。我使用了 Python 3 和 google 提供的客户端。我进行了配置和身份验证,但问题是,我无法在本地存储数据。每次执行时,我都会收到以下错误消息

googleapiclient.errors.HttpError: https://www.googleapis.com/bigquery/v2/projects/round-office-769/jobs?alt=json 返回“无效的提取目标 URI 'response/file-name-* .csv'。必须是有效的 Google 存储路径。">

这是我的工作配置:

def export_table(service, cloud_storage_path,
             projectId, datasetId, tableId, sqlQuery,
             export_format="CSV",
             num_retries=5):

# Generate a unique job_id so retries
# don't accidentally duplicate export
job_data = {
    'jobReference': {
        'projectId': projectId,
        'jobId': str(uuid.uuid4())
    },
    'configuration': {
        'extract': {
            'sourceTable': {
                'projectId': projectId,
                'datasetId': datasetId,
                'tableId': tableId,
            },
            'destinationUris': ['response/file-name-*.csv'],
            'destinationFormat': export_format
        },
        'query': {
            'query': sqlQuery,
        }
    }
}
return service.jobs().insert(
    projectId=projectId,
    body=job_data).execute(num_retries=num_retries)

我希望我可以只使用本地路径而不是云存储来存储数据,但我错了。

所以我的问题是:

我可以将查询的数据下载到本地(或本地数据库)还是必须使用谷歌云存储?

【问题讨论】:

标签: python google-bigquery


【解决方案1】:

您需要使用 Google Cloud Storage 进行导出作业。从 BigQuery 导出数据在 here 中进行了解释,还请检查不同路径语法的变体。

然后您可以将文件从 GCS 下载到本地存储。

Gsutil工具可以帮助您进一步将文件从GCS下载到本地机器。

本地无法一键下载,需要先导出到GCS,而不是传输到本地机器。

【讨论】:

  • 那么就没有别的办法,下载数据了吗?我问是因为目前我无法使用云存储,因为它没有激活,我没有激活权限。
【解决方案2】:

您可以在该表上运行 tabledata.list() 操作并设置“alt=csv”,这会将表的开头返回为 CSV。

【讨论】:

【解决方案3】:

您可以使用分页机制直接下载所有数据(无需通过 Google Cloud Storage 路由)。基本上,您需要为每个页面生成一个页面令牌,下载页面中的数据并对其进行迭代,直到所有数据都已下载,即没有更多令牌可用。这是Java中的示例代码,希望能阐明这个想法:

import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.BigqueryScopes;
import com.google.api.client.util.Data;
import com.google.api.services.bigquery.model.*;

/* your class starts here */

private String projectId = ""; /* fill in the project id here */
private String query = ""; /* enter your query here */
private Bigquery bigQuery;
private Job insert;
private TableDataList tableDataList;
private Iterator<TableRow> rowsIterator;
private List<TableRow> rows;
private long maxResults = 100000L; /* max number of rows in a page */

/* run query */
public void open() throws Exception {
    HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
    JsonFactory jsonFactory = new JacksonFactory();
    GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);
    if (credential.createScopedRequired())
        credential = credential.createScoped(BigqueryScopes.all());
    bigQuery = new Bigquery.Builder(transport, jsonFactory, credential).setApplicationName("my app").build();

    JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query);
    JobConfiguration jobConfig = new JobConfiguration().setQuery(queryConfig);
    Job job = new Job().setConfiguration(jobConfig);
    insert = bigQuery.jobs().insert(projectId, job).execute();
    JobReference jobReference = insert.getJobReference();

    while (true) {
        Job poll = bigQuery.jobs().get(projectId, jobReference.getJobId()).execute();
        String state = poll.getStatus().getState();
        if ("DONE".equals(state)) {
            ErrorProto errorResult = poll.getStatus().getErrorResult();
            if (errorResult != null)
                throw new Exception("Error running job: " + poll.getStatus().getErrors().get(0));
            break;
        }
        Thread.sleep(10000);
    }

    tableDataList = getPage();
    rows = tableDataList.getRows();
    rowsIterator = rows != null ? rows.iterator() : null;
}

/* read data row by row */
public /* your data object here */ read() throws Exception {
    if (rowsIterator == null) return null;

    if (!rowsIterator.hasNext()) {
        String pageToken = tableDataList.getPageToken();
        if (pageToken == null) return null;
        tableDataList = getPage(pageToken);
        rows = tableDataList.getRows();
        if (rows == null) return null;
        rowsIterator = rows.iterator();
    }

    TableRow row = rowsIterator.next();
    for (TableCell cell : row.getF()) {
        Object value = cell.getV();
        /* extract the data here */
    }

    /* return the data */
}

private TableDataList getPage() throws IOException {
    return getPage(null);
}

private TableDataList getPage(String pageToken) throws IOException {
    TableReference sourceTable = insert
            .getConfiguration()
            .getQuery()
            .getDestinationTable();
    if (sourceTable == null)
        throw new IllegalArgumentException("Source table not available. Please check the query syntax.");
    return bigQuery.tabledata()
            .list(projectId, sourceTable.getDatasetId(), sourceTable.getTableId())
            .setPageToken(pageToken)
            .setMaxResults(maxResults)
            .execute();
}

【讨论】:

  • 请注意,通过这种机制导出非常慢 - 每分钟 1M 行!我的网络下载速度超过 120Mbps,每行只有几列。
【解决方案4】:

另一种方法是通过 UI,一旦查询结果返回,您可以选择“下载为 CSV”按钮。

【讨论】:

    【解决方案5】:

    如果您安装了 Google BigQuery API 以及 pandas 和 pandas.io,则可以在 Jupyter 笔记本中运行 Python,查询 BQ 表,并将数据放入本地数据框中。从那里,您可以将其写入 CSV。

    【讨论】:

      【解决方案6】:

      正如Mikhail Berlyant 所说,

      BigQuery 不提供直接导出/下载查询的功能 结果到 GCS 或本地文件。

      您仍然可以使用 Web UI 只需三个步骤即可将其导出

      1. 配置查询以将结果保存在 BigQuery 表中并运行它。
      2. 将表导出到 GCS 中的存储桶。
      3. 从存储桶下载。

      为确保成本保持在较低水平,只需确保在将内容导出到 GCS 后删除表,并在将文件下载到计算机后从存储桶和存储桶中删除内容。

      步骤 1

      在 BigQuery 屏幕中,在运行查询之前转到更多 > 查询设置

      这将打开以下内容

      这里是你想要的

      • 目标:为查询结果设置目标表
      • 项目名称:选择项目。
      • 数据集名称:选择一个数据集。如果您没有,请创建并返回。
      • 表格名称:随便取一个名字(必须只包含字母、数字或下划线)。
      • 结果大小:允许较大的结果(没有大小限制)。

      然后保存它,查询被配置为保存在特定的表中。现在您可以运行查询了。

      第二步

      要将其导出到 GCP,您必须转到表并单击导出 > 导出到 GCS。

      这将打开以下屏幕

      选择 GCS 位置中,您可以定义存储桶、文件夹和文件。

      例如,您有一个名为 daria_bucket 的存储桶(只能使用小写字母、数字、连字符 (-) 和下划线 (_)。点 (.) 可用于形成一个有效的域名。)并希望将文件保存在存储桶的根目录中,名称为 test,然后你写(在选择 GCS 位置)

      daria_bucket/test.csv
      

      如果文件太大(超过 1 GB),您会收到错误消息。要修复它,您必须使用通配符将其保存在更多文件中。所以,你需要添加 *,就像那样

      daria_bucket/test*.csv
      

      这将在存储桶 daria_bucket 中将从表中提取的所有数据存储在多个名为 test000000000000, test000000000001, test000000000002, ... testX 的文件中。

      第三步

      然后转到 Storage,您会看到存储桶。

      进入其中,您会找到一个(或多个)文件。然后您可以从那里下载。

      【讨论】:

        【解决方案7】:

        使用 Python pandas 将数据从 BigQuery 表导出到 CSV 文件:

        import pandas as pd
        from google.cloud import bigquery
        
        selectQuery = """SELECT * FROM dataset-name.table-name"""
        bigqueryClient = bigquery.Client()
        df = bigqueryClient.query(selectQuery).to_dataframe()
        df.to_csv("file-name.csv", index=False)
        

        【讨论】:

          【解决方案8】:

          也许您可以使用 Google 提供的 simba odbc 驱动程序并使用任何提供 odbc 连接的工具来创建 csv。它甚至可以是 microsoft ssis,你甚至不需要编码。

          【讨论】:

            猜你喜欢
            • 2017-11-13
            • 1970-01-01
            • 2014-12-05
            • 1970-01-01
            • 2021-03-02
            • 2018-11-20
            • 2013-05-24
            • 1970-01-01
            相关资源
            最近更新 更多