【问题标题】:How to partition Bigquery table using apache beam in Python?如何在 Python 中使用 apache 梁对 Bigquery 表进行分区?
【发布时间】:2022-06-15 19:53:33
【问题描述】:

我正在将连接的输出写入 bigquery 表。 表有一个日期列。 我想按日期对表格进行分区。 但是,我认为没有选项可以提供我们想要分区的字段。

我尝试了以下代码:

additional_bq_parameters={'timePartitioning': {'type': 'DAY'}}

但是,它不是在 dob 上进行分区,而是按 _PARTITIONTIME 进行分区。

我也试过了,

additional_bq_parameters={'timePartitioning': {
        'type': 'DAY',
        'field': '_time'
    }}

但是,那么它给出的错误。

我的代码:

from apache_beam.io.gcp.internal.clients import bigquery
import apache_beam as beam

def retTuple(element):
  
  thisTuple=element.split(',')
  return (thisTuple[0],thisTuple[1:])

def jstr(cstr):
    import datetime

    
    left_dict=cstr[1]['dep_data']
    right_dict=cstr[1]['loc_data']
    for i  in left_dict:
    
        for j in right_dict:
            id,name,rank,dept,dob,loc,city=([cstr[0]]+i+j)
            
            json_str={ "id":id,"name":name,"rank":rank,"dept":dept,"dob":datetime.datetime.strptime(dob, "%d-%m-%Y").strftime("%Y-%m-%d").strip("'"),"loc":loc,"city":city }
    return json_str
            
table_spec = 'dotted-transit-351803:test_dataflow.inner_join'
table_schema = 'id:STRING,name:STRING,rank:INTEGER,dept:STRING,dob:DATE,loc:INTEGER,city:STRING'   
gcs='gs://dataflow4bigquery/temp/'
    
p1 = beam.Pipeline()

# Apply a ParDo to the PCollection "words" to compute lengths for each word.
dep_rows = ( 
                p1
                | "Reading File 1" >> beam.io.ReadFromText('dept_data.txt')
                | 'Pair each employee with key' >> beam.Map(retTuple)          # {149633CM : [Marco,10,Accounts,1-01-2019]}
    
               )


loc_rows = ( 
                p1
                | "Reading File 2" >> beam.io.ReadFromText('location.txt') 
                | 'Pair each loc with key' >> beam.Map(retTuple)                # {149633CM : [9876843261,New York]}
               )


results = ({'dep_data': dep_rows, 'loc_data': loc_rows} 
           
           | beam.CoGroupByKey()
           | beam.Map(jstr)
           |  beam.io.WriteToBigQuery(
               custom_gcs_temp_location=gcs,
            table=table_spec,
            schema=table_schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
             additional_bq_parameters={'timePartitioning': {
    'type': 'DAY',
    'field': 'dob'
}}
               
        )
          )




 
p1.run().wait_until_finish()

错误是:

RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_378_c4a563c648634e9dbbf7be3a56578b6d_bfcd0c33602b47deae6a351f72edc0cb failed. Error Result: <ErrorProto
 message: 'Incompatible table partitioning specification. Expects partitioning specification none, but input partitioning specification is interval(type:day,field:dob)'
 reason: 'invalid'> [while running 'WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs']

【问题讨论】:

  • 如果您遇到错误,请将它们包含在您的问题中
  • 另外请提供完整的代码,以便我们查看您描述的参数的上下文。

标签: python google-bigquery google-cloud-dataflow apache-beam


猜你喜欢
  • 2022-10-15
  • 1970-01-01
  • 1970-01-01
  • 2021-11-26
  • 2022-01-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-03-14
相关资源
最近更新 更多