【发布时间】:2020-04-05 00:12:15
【问题描述】:
我有一个 spark 脚本,需要为每一行进行 60 次 api 调用。目前我正在使用 BigQuery 作为数据仓库。我想知道是否有一种方法可以使用BigQuery API 或BigQuery Storage API 从我的udf 查询数据库?也许是一种执行批量查询的方法? pandas-gbq 会是更好的解决方案吗?我需要对每行进行的每个查询都是 select count(*) from dataset.table where {...} 查询。
目前我正在使用大查询客户端,如下面的代码 sn-p 所示,但我不确定这是否是利用我的资源的最佳方式。如果此用例的代码没有正确完成,我深表歉意,我是 spark 和 BigQuery 的新手。
def clients():
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/hadoop/credentials.json'
credentials, your_project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
# Make clients.
bqclient = bigquery.Client(
credentials=credentials,
project=your_project_id,
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=credentials
)
return bqclient, bqstorageclient
def query_cache(query):
bqclient, bqstorageclient = clients()
dataframe = (
bqclient.query(query)
.result()
.to_dataframe(bqstorage_client=bqstorageclient)
)
return dataframe['f0_'][0]
@pandas_udf(schema(), PandasUDFType.GROUPED_MAP)
def calc_counts(df):
query = "select count(*) from dataset.table where ...{some column filters}..."
df['count'] = df.apply(query_cache, args=(query), axis=1)
【问题讨论】:
标签: apache-spark pyspark google-bigquery