我的第一个问题有一个解决方案,使用以下函数
def results_to_df(results):
columns = [
col['Label']
for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']
]
listed_results = []
for res in results['ResultSet']['Rows'][1:]:
values = []
for field in res['Data']:
try:
values.append(list(field.values())[0])
except:
values.append(list(' '))
listed_results.append(
dict(zip(columns, values))
)
return listed_results
然后:
t = results_to_df(response)
pd.DataFrame(t)
至于我的第二个问题和@EricBellet 的请求,我还添加了我的分页方法,与在 S3 中加载 Athena 输出的结果相比,我发现这种方法效率低且时间更长:
def run_query(query, database, s3_output):
'''
Function for executing Athena queries and return the query ID
'''
client = boto3.client('athena')
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output,
}
)
print('Execution ID: ' + response['QueryExecutionId'])
return response
def format_result(results):
'''
This function format the results toward append in the needed format.
'''
columns = [
col['Label']
for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']
]
formatted_results = []
for result in results['ResultSet']['Rows'][0:]:
values = []
for field in result['Data']:
try:
values.append(list(field.values())[0])
except:
values.append(list(' '))
formatted_results.append(
dict(zip(columns, values))
)
return formatted_results
res = run_query(query_2, database, s3_ouput) #query Athena
import sys
import boto3
marker = None
formatted_results = []
query_id = res['QueryExecutionId']
i = 0
start_time = time.time()
while True:
paginator = client.get_paginator('get_query_results')
response_iterator = paginator.paginate(
QueryExecutionId=query_id,
PaginationConfig={
'MaxItems': 1000,
'PageSize': 1000,
'StartingToken': marker})
for page in response_iterator:
i = i + 1
format_page = format_result(page)
if i == 1:
formatted_results = pd.DataFrame(format_page)
elif i > 1:
formatted_results = formatted_results.append(pd.DataFrame(format_page))
try:
marker = page['NextToken']
except KeyError:
break
print ("My program took", time.time() - start_time, "to run")
它的格式不是很好,但我认为它可以完成工作......
2021 年更新
今天我使用 aws-data-wrangler 的自定义包装作为我几年前提出的原始问题的最佳解决方案。
import awswrangler as wr
def run_athena_query(query, database, s3_output, boto3_session=None, categories=None, chunksize=None, ctas_approach=None, profile=None, workgroup='myTeamName', region_name='us-east-1', keep_files=False, max_cache_seconds=0):
"""
An end 2 end Athena query method, based on the AWS Wrangler package.
The method will execute a query and will return a pandas dataframe as an output.
you can read more in https://aws-data-wrangler.readthedocs.io/en/stable/stubs/awswrangler.athena.read_sql_query.html
Args:
- query: SQL query.
- database (str): AWS Glue/Athena database name - It is only the original database from where the query will be launched. You can still using and mixing several databases writing the full table name within the sql (e.g. database.table).
- ctas_approach (bool): Wraps the query using a CTAS, and read the resulted parquet data on S3. If false, read the regular CSV on S3.
- categories (List[str], optional): List of columns names that should be returned as pandas.Categorical. Recommended for memory restricted environments.
- chunksize (Union[int, bool], optional): If passed will split the data in a Iterable of DataFrames (Memory friendly). If True wrangler will iterate on the data by files in the most efficient way without guarantee of chunksize. If an INTEGER is passed Wrangler will iterate on the data by number of rows igual the received INTEGER.
- s3_output (str, optional): Amazon S3 path.
- workgroup (str, optional): Athena workgroup.
- keep_files (bool): Should Wrangler delete or keep the staging files produced by Athena? default is False
- profile (str, optional): aws account profile. if boto3_session profile will be ignored.
- boto3_session (boto3.Session(), optional): Boto3 Session. The default boto3 session will be used if boto3_session receive None. if profilename is provided a session will automatically be created.
- max_cache_seconds (int): Wrangler can look up in Athena’s history if this query has been run before. If so, and its completion time is less than max_cache_seconds before now, wrangler skips query execution and just returns the same results as last time. If reading cached data fails for any reason, execution falls back to the usual query run path. by default is = 0
Returns:
- Pandas DataFrame
"""
# test for boto3 session and profile.
if ((boto3_session == None) & (profile != None)):
boto3_session = boto3.Session(profile_name=profile, region_name=region_name)
print("Quering AWS Athena...")
try:
# Retrieving the data from Amazon Athena
athena_results_df = wr.athena.read_sql_query(
query,
database=database,
boto3_session=boto3_session,
categories=categories,
chunksize=chunksize,
ctas_approach=ctas_approach,
s3_output=s3_output,
workgroup=workgroup,
keep_files=keep_files,
max_cache_seconds=max_cache_seconds
)
print("Query completed, data retrieved successfully!")
except Exception as e:
print(f"Something went wrong... the error is:{e}")
raise Exception(e)
return athena_results_df
你可以阅读更多here