【发布时间】:2019-12-23 04:36:12
【问题描述】:
我的云功能是根据某些规则形成动态查询并将其存储在文件中到云存储中,然后它将调用我的数据流模板。我将 inputfile 作为 ValueProvider 传递给我的数据流模板,该模板正在保存一个查询,进一步我正在尝试通过 beam.io.BigQuerySource 在我的管道中使用该查询。但它给了我一个错误:ValueError: A BigQuery table or a query must be specified
一些云函数代码:
query_job = client.query(
query,
job_config=job_config)
query_job.result()
print('Query results loaded to table {}'.format(table_ref.path))
file_name = '{}_RM_{}.csv'.format(unit, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')[:-3])
destination_uri = "gs://test-bucket/{}".format(file_name)
dataset_ref = client.dataset(dataset_id, project=PROJECT)
table_ref = dataset_ref.table(table_name)
extract_job = client.extract_table(
table_ref,
destination_uri)
extract_job.result() #Extracts results to the GCS
client.delete_table(table_ref) #Deletes table in BQ
BODY = {
"jobName": "{jobname}".format(jobname=JOBNAME),
"parameters": {
"inputFile": destination_uri
},
"environment": {
"tempLocation": "gs://{bucket}/temp".format(bucket=BUCKET),
"zone": "europe-west1-b"
}
}
request = service.projects().templates().launch(projectId=PROJECT, gcsPath=GCSPATH, body=BODY)
response = request.execute()
数据流代码:
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--inputFile', default='query.txt')
class Query:
def query_final(self, inputFile):
from google.cloud import storage
client = storage.Client()
bucket = client.get_bucket('ingka-retention-test-bucket')
blob = bucket.get_blob(str(inputFile))
return blob
def dataflow():
options = PipelineOptions.from_dictionary(pipeline_options)
user_options = options.view_as(UserOptions)
inputFile = user_options.inputFile
new_query = Query()
final_query = new_query.query_final(inputFile)
with beam.Pipeline(options=options) as p:
rows = p | 'Read Orders from BigQuery ' >> beam.io.Read(beam.io.BigQuerySource(query=final_query, use_standard_sql=True))
完成此任务的原因或任何其他最佳方法是什么? 提前谢谢你!
【问题讨论】:
标签: python google-bigquery google-cloud-functions google-cloud-dataflow apache-beam