【问题标题】:Airflow Pipeline CSV to BigQuery with Schema Changes带有架构更改的 Airflow Pipeline CSV 到 BigQuery
【发布时间】:2020-06-30 03:14:33
【问题描述】:

背景

我需要设计一个 Airflow 管道来将 CSV 加载到 BigQuery 中。

我知道 CSV 的架构经常发生变化。加载第一个文件后,架构可能是

id | ps_1 | ps_1_value

当第二个文件登陆并加载它时,它可能看起来像

id | ps_1 | ps_1_value | ps_1 | ps_2_value.

问题

处理此问题的最佳方法是什么?


我首先想到的是

  1. 加载第二个文件
  2. 将架构与当前表进行比较
  3. 更新表,添加两列(ps_2,ps_2_value)
  4. 插入新行

我会在 PythonOperator 中执行此操作。

如果文件 3 出现并且看起来像 id | ps_2 | ps_2_value,我会填写缺失的列并进行插入。

感谢您的反馈。

【问题讨论】:

  • 你能更详细地解释你的第一步吗?您是否使用要插入的数据加载临时表?
  • 我可以将文件读入 pandas 数据帧,或者可以,将其加载到 bigquery 中的临时表中,然后从那里获取架构。

标签: google-bigquery airflow


【解决方案1】:

加载两个先前的文件example_data_1.csvexample_data_2.csv 后,我可以看到这些字段被插入到正确的列中,并根据需要添加了新列。

编辑:灯泡时刻意识到schema_update_options 的存在。见这里:https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.SchemaUpdateOption.html

csv_to_bigquery = GoogleCloudStorageToBigQueryOperator(
    task_id='csv_to_bigquery',
    google_cloud_storage_conn_id='google_cloud_default',
    bucket=airflow_bucket,
    source_objects=['data/example_data_3.csv'],
    skip_leading_rows=1,
    bigquery_conn_id='google_cloud_default',    
    destination_project_dataset_table='{}.{}.{}'.format(project, schema, table),
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_APPEND',
    schema_update_options=['ALLOW_FIELD_RELAXATION', 'ALLOW_FIELD_ADDITION'],
    autodetect=True,
    dag=dag
)

【讨论】:

  • write_disposition='WRITE_TRUNCATE' 会起作用吗?
【解决方案2】:

基本上,为您的案例推荐的管道包括创建一个临时表来处理您的新数据。 由于AirFlow 是一种编排工具,因此不建议通过它创建大数据流。

鉴于此,您的DAG 可能与您当前的DAG 非常相似:

  1. 将新文件加载到临时表中
  2. 比较实际表的架构和临时表的架构。
  3. 运行查询以将数据从临时表移动到实际表。如果临时表有新字段,请使用参数schema_update_options 将它们添加到实际表中。除此之外,如果您的实际表有 NULLABLE 模式的字段,它可以轻松处理丢失的列,以防您的新数据有一些丢失的字段。
  4. 删除您的临时表
  5. 如果您使用的是GCS,请将您的文件移动到另一个存储桶或目录。

最后,我想指出一些可能对你有用的链接:

  1. AirFlow Documentation(BigQuery 的运算符)
  2. article 显示与您的问题类似的问题,您可以在其中找到一些提到的信息。

希望对你有帮助

【讨论】:

  • 您对schema_update_options 表示赞同,但我实际上并不需要加载到临时表中。 BigQuery 足够聪明,可以完成插入工作。谢谢您的帮助。将在下面发布完整的运算符。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2023-01-30
  • 2017-01-03
  • 1970-01-01
  • 2017-07-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多