【问题标题】:Run a BigQuery query and write the data into cloud storage bucket in parquet using airflow运行 BigQuery 查询并使用气流将数据写入 Parquet 中的云存储桶
【发布时间】:2021-11-24 09:04:18
【问题描述】:

我正在尝试创建一个 DAG,它将从 BigQuery 查询中提取数据并以 parquet 格式写入 gcs 存储桶。我查看了this question 并在这里得到了一些帮助。它建议使用BigQueryOperator 执行查询,然后使用BigQueryToCloudStorageOperator 写入gcs 存储桶。使用这种方法,我必须首先将查询结果写入表中,然后从该表中写入 gcs 存储桶。

分两步:

bq_query = bigquery_operator.BigQueryOperator(
    task_id='bq_query',
    sql="""
        <select query with filters>
        """.format(date=date1),
        use_legacy_sql=False,
        destination_dataset_table=<table_name>
        location="southamerica-east1",
        write_disposition="WRITE_EMPTY",
        create_disposition="CREATE_IF_NEEDED")



export_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator(
    task_id='export_to_gcs',
    source_project_dataset_table=destination_dataset_table,
    destination_cloud_storage_uris=[output_file],
    export_format='PARQUET')

有没有一种方法可以直接将大查询数据写入 gcs 存储桶而无需先写入表?我相信直接导出是可能的,但我正在寻找使用过滤器运行查询然后写入 gcs。

【问题讨论】:

    标签: google-bigquery airflow


    【解决方案1】:

    有可能,运营商为我们提供了一种固定的做事方式,它并不总是最佳的,但它可以节省时间。因此,其中一种方法是使用 python 运算符,该运算符具有检索 bigquery 数据并将输出上传到存储的功能。

    Python 运算符

    task = PythonOperator(
            task_id='get_data_and_upload',
            python_callable=get_bigquery_data,
            op_kwargs={'custom_date': date}
          )
    

    函数 get_bigquery_data

    # Libraries to use 
    from google.cloud import bigquery, storage 
    from google.oauth2 import service_account
    import pandas as pd
    
    # function
    def get_data_and_upload(custom_date):
        # Construct a BigQuery client object.
        key_path = "path/to/service_account.json"
    
        credentials = service_account.Credentials.from_service_account_file(
            key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],
        )
    
        client = bigquery.Client(credentials=credentials, 
        project=credentials.project_id,)
        
        #for this sample im just print it and not using it.
        print(custom_date)
    
        query = """
            SELECT name, SUM(number) as total_people
            FROM `bigquery-public-data.usa_names.usa_1910_2013`
            WHERE state = 'TX'
            GROUP BY name, state
            ORDER BY total_people DESC
            LIMIT 20
        """
        query_job = client.query(query)  
        output = []
    
        # build our result object
        for row in query_job:
            output.append({'name':row[0],'total_people':row[1]})
    
        # move to a dataframe. I use pandas for parquet conversion
        df = pd.DataFrame(output)
        bobject = df.to_parquet(path=None,compression='gzip')
    
        # upload file object to google cloud storage
        bucket_name = "my-bucket-name" 
        destination_blob_name = "parquet_files/parquet_file.gzip" 
    
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(destination_blob_name)
        blob.upload_from_string(bobject) 
    
        print("File uploaded to {}.".format(destination_blob_name))
    

    此外,以上示例都是一步完成的。它有效,但通常不好看,太死板了。请记住,您可以创建您认为合适的 python 函数(一个用于获取数据,一个用于转换数据,一个用于将数据推送到实际存储中)。如果你的airflow版本在2.0以上,可以使用taskflowapi传参。如果您的版本低于该版本,则必须使用XCOM

    我已经测试了该功能,我认为由于您自己的环境限制或版本控制,您应该小心地将其转换为您的气流安装,因为您可能需要更新代码或使用不同的库来获得相同的输出。

    其他选项是创建您自己的custom operator

    这里有一些有用的链接:

    【讨论】:

      猜你喜欢
      • 2020-01-25
      • 2020-03-08
      • 1970-01-01
      • 2018-06-22
      • 1970-01-01
      • 2015-08-22
      • 2021-03-26
      • 2016-08-10
      • 1970-01-01
      相关资源
      最近更新 更多