【问题标题】:How to write query result to Google Cloud Storage bucket directly?如何将查询结果直接写入 Google Cloud Storage 存储桶?
【发布时间】:2018-11-17 00:08:10
【问题描述】:
from google.cloud import bigquery  
query = """ select * from emp where emp_name=@emp_name""" 
query_params = [bigquery.ScalarQueryParameter('emp_name', 'STRING', 'name')] 
job_config = bigquery.QueryJobConfig() 
job_config.query_parameters = query_params  
client = bigquery.Client() 
query_job = client.query(query, job_config=job_config) 
result = query_job.result()

如何将结果写入 Google Cloud Storage,而不是将其写入 CSV 并上传到云存储桶?

【问题讨论】:

    标签: python google-cloud-platform google-bigquery google-cloud-storage


    【解决方案1】:
    from google.cloud import bigquery
    from google.oauth2 import service_account
    
    
    credentials = service_account.Credentials.from_service_account_file("dev-key.json", scopes=["https://www.googleapis.com/auth/cloud-platform"],)
    client = bigquery.Client(credentials=credentials, project=credentials.project_id,)
    
    
    bq_export_to_gs = """
    EXPORT DATA OPTIONS(
      uri='gs://my-bucket/logs/edo/dengg_audit/bq-demo/temp4/*',
      format='CSV',
      overwrite=true,
      header=false,
      field_delimiter='^') AS
    select col1 , col2 from  `project.schema.table` where clientguid = '1234' limit 10
    
    """
    
    query_job= client.query(bq_export_to_gs)
    results = query_job.result()
    for row in results:
        print(row)
    
    

    【讨论】:

      【解决方案2】:

      解决方案:BigQuery 结果直接存入 Google Cloud Storage 存储桶

      from google.cloud import bigquery
      from google.cloud import storage
      
      def export_to_gcs():
          QUERY = "SELECT * FROM TABLE where CONDITION" # change the table and where condition
          bq_client = bigquery.Client()
          query_job = bq_client.query(QUERY) # BigQuery API request
          rows_df = query_job.result().to_dataframe()
          
          storage_client = storage.Client() # Storage API request
          bucket = storage_client.get_bucket(BUCKETNAME) # change the bucket name
          blob = bucket.blob('temp/Add_to_Cart.csv')
          blob.upload_from_string(rows_df.to_csv(sep=';',index=False,encoding='utf-8'),content_type='application/octet-stream')
          return "success"
      

      【讨论】:

        【解决方案3】:

        你可以试试这个选项:

        from google.cloud import bigquery
        bigqueryClient = bigquery.Client()
        uri = "gs://my-bucket/file.csv"
        tableRref = bigqueryClient.dataset("my-dataset").table("my-table")
        bqJob = bigqueryClient.extract_table(tableRref, uri)
        bqJob.result()
        

        【讨论】:

          【解决方案4】:
          #THIS IS THE CODE I AM RUNNING
          
          # Set the destination table
          for i in range(1,13):
          table_ref = client.dataset("newdataset").table("chicago_months_increment")
          job_config.destination = table_ref
          job_config.allow_large_results = True
          
          
          query_job = client.query('SELECT * FROM `bigquery-public- 
          data.chicago_taxi_trips.taxi_trips` WHERE (Select EXTRACT(MONTH from 
          trip_start_timestamp) )=i;',
          location='US', # Location must match dataset
          job_config=job_config)
          rows = list(query_job) # Waits for the query to finish
          
          
          query_job.result() 
          
          # Export table to GCS
          destination_uri = "gs://monthly-data/month-"+i+"-*.csv"
          dataset_ref = client.dataset("newdataset", project="chicago-project-247714")
          table_ref = dataset_ref.table("chicago_months_increment")
          
          
          
          extract_job = client.extract_table(
          table_ref,
          destination_uri,
          location='US')
          extract_job.result()  # Waits for job to complete
          client.delete_table(table_ref) #Deletes table in BQ
          
          #ERROR I AM GETTING
          ---------------------------------------------------------------------------
          BadRequest                                Traceback (most recent call last)
          <ipython-input-5-e176648eba58> in <module>()
            9     location='US', # Location must match dataset
           10     job_config=job_config)
          ---> 11     rows = list(query_job) # Waits for the query to finish
           12 
           13 
          
           /home/amiteshwar/.local/lib/python2.7/site- 
          

          packages/google/cloud/bigquery/job.pyc in iter(self) 2988 第2989章 -> 2990 返回 iter(self.result()) 2991 第2992章

          /home/amiteshwar/.local/lib/python2.7/site- 
          

          packages/google/cloud/bigquery/job.pyc 结果(self、timeout、page_size、 重试) 2875 如果作业没有在给定的超时时间内完成。 第2876章 -> 2877 super(QueryJob, self).result(timeout=timeout) 2878 # 返回一个迭代器而不是返回作业。 2879 如果不是 self._query_results:

          /home/amiteshwar/.local/lib/python2.7/site- 
          

          packages/google/cloud/bigquery/job.pyc 结果(自我、超时、重试) 第731章 732 # TODO: 修改 PollingFuture 以便它可以将重试参数传递给 完毕()。 --> 733 return super(_AsyncJob, self).result(timeout=timeout) 734 735 def 取消(自我):

          /home/amiteshwar/.local/lib/python2.7/site- 
          packages/google/api_core/future/polling.pyc in result(self, timeout)
          125             # pylint: disable=raising-bad-type
          126             # Pylint doesn't recognize that this is valid in this case.
          --> 127             raise self._exception
          128 
          129         return self._result
          
          BadRequest: 400 Unrecognized name: i at [1:125]
          

          【讨论】:

            【解决方案5】:

            @dsesto 的回答对我很有用。我使用他的代码并添加了一些额外的行来查询 BigQuery,将结果写入表,然后导出到 GCS 并将结果导入 Dask DataFrame。代码被包装成一个函数。

            def df_from_bq(query:str,table=None,compute=False):
            
            from time import gmtime, strftime
            from google.cloud import bigquery#y, storage 
            import dask.dataframe as dd
            import gcsfs
            
            client = bigquery.Client.from_service_account_json('YOUR_PATH') #Authentication if BQ using ServiceKey
            project = 'YOUR_PROJECT'
            
            table_name = 'result_'+str(strftime("%Y%m%d_%H%M%S", gmtime())) if table==None else table #Creates custome table name if no name is defined
            
            job_config = bigquery.QueryJobConfig()
            table_ref = client.dataset("YOUR_DATASET").table(table_name)
            job_config.destination = table_ref
            job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE #Creates the table with query result. Overwrites it if the table exists
            
            query_job = client.query(
                query,
                location='US', 
                job_config=job_config)
            query_job.result() 
            print('Query results loaded to table {}'.format(table_ref.path))
            
            destination_uri = "gs://YOUR_BUCKET/{}".format(table_name+'_*'+'.csv') 
            dataset_ref = client.dataset("YOUR_DATASET", project=project)
            table_ref = dataset_ref.table(table_name)
            
            extract_job = client.extract_table(
                table_ref,
                destination_uri,
                location='US') 
            extract_job.result() #Extracts results to the GCS
            
            print('Query results extracted to GCS: {}'.format(destination_uri))
            
            client.delete_table(table_ref) #Deletes table in BQ
            
            print('Table {} deleted'.format(table_name))
            
            gcs = gcsfs.GCSFileSystem(project=project, token='cache') 
            df = dd.read_csv('gcs://YOUR_BUCKET/{}'.format(table_name+'_*'+'.csv'),  storage_options={'token': gcs.session.credentials})
            
            #storage_client = storage.Client.from_service_account_json('C:\\Users\o.korshun\Documents\o.korshun.json')
            #bucket = storage_client.get_bucket('plarium-analytics')
            #blob = bucket.blob(table_name+'.csv')
            #blob.delete() #Uncomment if you need to delete Blob after the DataFrame is created
            
            #print('Blob {} deleted'.format(table_name+'.csv'))
            print('Results imported to DD!')
            
            return df if compute == False else df.compute().reset_index(in_place=True)
            

            请注意,BQ中的表格在结果导入云存储后会被删除。

            【讨论】:

              【解决方案6】:

              根据您的具体用例(导出频率、导出大小等),@GrahamPolley 的答案中提出的解决方案可能对您有用,尽管它们需要更多的开发和关注。

              writing query results 目前的可能性是将结果写入表格或下载到本地,甚至直接下载到 CSV 也有一些limitations。因此,无法直接将查询结果以 CSV 格式写入 GCS。但是,有一个两步解决方案,包括:

              1. Write query results to a BQ table
              2. Export data from a BQ table to a CSV file in GCS。请注意,此功能也有一些limitations,但没有那么狭窄。

              以下 Python 代码可以让您了解如何执行该任务:

              from google.cloud import bigquery
              client = bigquery.Client()
              
              # Write query results to a new table
              job_config = bigquery.QueryJobConfig()
              table_ref = client.dataset("DATASET").table("TABLE")
              job_config.destination = table_ref
              job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
              
              query_job = client.query(
                  'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10',
                  location='US', # Location must match dataset
                  job_config=job_config)
              rows = list(query_job)  # Waits for the query to finish
              
              
              # Export table to GCS
              destination_uri = "gs://BUCKET/FILE.CSV"
              dataset_ref = client.dataset("DATASET", project="PROJECT_ID")
              table_ref = dataset_ref.table("TABLE")
              
              extract_job = client.extract_table(
                  table_ref,
                  destination_uri,
                  location='US')
              extract_job.result()  # Waits for job to complete
              

              请注意,在那之后,您必须删除该表(您也可以通过编程方式执行此操作)。如果您必须使流程自动化,这可能不是最佳解决方案(如果这是您的用例,也许您应该更好地探索@Graham 的解决方案),但它可以解决一个简单的场景。

              【讨论】:

              • 很好的例子,这也可以在计算引擎上自动化,作为完整数据流管道的更简单方法。
              • @bobbychowdary 是的,正如本文所述,您还可以使用该代码作为工作的第二部分。
              • @dsesto 在使用您的方法时出现此错误 AttributeError: 'module' object has no attribute 'WriteDisposition'
              • @bobbychowdary 您从哪里运行此代码?您使用的是哪个版本的 google-cloud-bigquery 库?您可以尝试将库升级到最新版本:pip install --upgrade google-cloud-bigquery。查看this GitHub issue,其中讨论了旧版本库中这些常量的缺失。
              【解决方案7】:

              BigQuery 不支持将其查询结果直接写入 GCS。您必须将结果写入表格,然后在表格实现后将其导出到 GCS。您可以使用 Cloud Composer 为您编排。

              或者,您可以使用 Dataflow 管道一次性实现您想要的结果。但这需要更多的工作,而且会花费更多的钱。这个想法是编写一个管道,使用您的 SQL 查询从 BigQuery 中读取数据,然后将结果写入 GCS。不过它也会更慢。

              【讨论】:

                猜你喜欢
                • 2020-05-05
                • 2022-07-25
                • 1970-01-01
                • 2018-06-18
                • 2018-11-27
                • 1970-01-01
                • 1970-01-01
                • 1970-01-01
                • 2017-01-24
                相关资源
                最近更新 更多