【问题标题】:Loading a CSV file into BigQuery with one column containing all empty values将 CSV 文件加载到 BigQuery 中,其中一列包含所有空值
【发布时间】:2018-12-20 11:38:50
【问题描述】:

我希望使用 Python API 将以下 file 附加到具有以下定义架构的 BigQuery 表中:

[
   {
    "name": "batsman",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "batting_team",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "bowler",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "city",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "date",
    "type": "DATE",
    "mode": "NULLABLE"
   },
   {
    "name": "delivery",
    "type": "FLOAT",
    "mode": "NULLABLE"
   },
   {
    "name": "extras",
    "type": "INTEGER",
    "mode": "NULLABLE"
   },
   {
    "name": "extras_type",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "inning",
    "type": "INTEGER",
    "mode": "NULLABLE"
   },
   {
    "name": "match_code",
    "type": "INTEGER",
    "mode": "NULLABLE"
   },
   {
    "name": "non_striker",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "player_out",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "runs",
    "type": "INTEGER",
    "mode": "NULLABLE"
   },
   {
    "name": "team1",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "team2",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "toss_decision",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "toss_winner",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "total",
    "type": "INTEGER",
    "mode": "NULLABLE"
   },
   {
    "name": "venue",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "wicket_fielders",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "wicket_kind",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "win_margin",
    "type": "INTEGER",
    "mode": "NULLABLE"
   },
   {
    "name": "win_type",
    "type": "STRING",
    "mode": "NULLABLE"
   },
   {
    "name": "winner",
    "type": "STRING",
    "mode": "NULLABLE"
   }
  ]

我用来附加到 BigQuery 的代码如下:

def insert_data_in_bq(bucketname, csv_filepath, table_id='ipl'):
    """Appends a csv to a BigQuery table."""
    client = bigquery.Client()
    dataset_id = 'cric'
    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.skip_leading_rows = 1
    job_config.source_format = bigquery.SourceFormat.CSV
    # job_config.null_marker = 'NULL'
    uri = 'gs://' + bucketname + '/' + csv_filepath
    load_job = client.load_table_from_uri(uri, dataset_ref.table(table_id), 
    job_config=job_config)  # API request
    print('Starting job {}'.format(load_job.job_id))
    load_job.result()  # Waits for table load to complete.
    print('Job finished.')
    print('Loaded {} rows.'.format(load_job.output_rows))

但是,每当我加载文件时,我都会收到一条错误消息:

BadRequest: 400 Invalid schema update. Field win_margin has changed type from INTEGER to STRING

普通文件看起来像this

我应该怎么做才能将win_margin 列保持为INTEGER,但仍然能够加载包含列的所有空行的文件?

【问题讨论】:

  • 当您说:“将 win_type 列保持为 INTEGER”时,但在您的示例文件中 win_type 是一个字符串。你是说 win_margin 吗?
  • 我的意思是win_margin。已编辑问题。

标签: python google-cloud-platform google-bigquery


【解决方案1】:

如您所见,BigQuery 不允许您向整数字段添加空值,因此您需要在创建文件或上传期间填写此字段,例如:

  1. 在构建文件时确保win_margin 不为空,将 0 或 空
  2. 如果不可能,您需要将您的 python 代码更新为 上传前更新字段值
  3. 在工作表本身中创建一个公式来填充字段
  4. 将文件上传到 BQ 中的另一个表并运行 SQL 命令将数据从一个表移动到另一个表

【讨论】:

    【解决方案2】:

    您需要指定表模式,其中明确指定win_margin 列的类型。您可以通过设置job_config.schema 字段并将job_config.autodetect 设置为False 来实现。

    以下是可用于从文件中读取架构的函数:

    def read_bigquery_schema_from_file(filepath):
        file_content = open(filepath).read()
        json_content = json.loads(file_content)
        return read_bigquery_schema_from_json_recursive(json_content)
    
    def read_bigquery_schema_from_json_recursive(json_schema):
        """
        CAUTION: Recursive function
        This method can generate BQ schemas for nested records
        """
        result = []
        for field in json_schema:
            if field.get('type').lower() == 'record' and field.get('fields'):
                schema = SchemaField(
                    name=field.get('name'),
                    field_type=field.get('type', 'STRING'),
                    mode=field.get('mode', 'NULLABLE'),
                    description=field.get('description'),
                    fields=read_bigquery_schema_from_json_recursive(field.get('fields'))
                )
            else:
                schema = SchemaField(
                    name=field.get('name'),
                    field_type=field.get('type', 'STRING'),
                    mode=field.get('mode', 'NULLABLE'),
                    description=field.get('description')
                )
            result.append(schema)
        return result
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-03-18
      • 2020-01-17
      • 1970-01-01
      • 1970-01-01
      • 2016-02-20
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多