【发布时间】:2016-11-19 01:41:22
【问题描述】:
我在 Python 中有一个数据框。我可以将此数据作为新表写入 Redshift 吗? 我已成功创建到 Redshift 的数据库连接,并且能够执行简单的 sql 查询。 现在我需要给它写一个数据框。
【问题讨论】:
-
也许你可以提供你目前必须的代码,以便于提供答案。
标签: python pandas dataframe amazon-redshift psycopg2
我在 Python 中有一个数据框。我可以将此数据作为新表写入 Redshift 吗? 我已成功创建到 Redshift 的数据库连接,并且能够执行简单的 sql 查询。 现在我需要给它写一个数据框。
【问题讨论】:
标签: python pandas dataframe amazon-redshift psycopg2
鉴于所有答案都无法解决我的查询,所以我用谷歌搜索并得到了以下 sn-p,它在 2 分钟内完成了工作。我在 Windows 上使用 Python 3.8.5。
from red_panda import RedPanda
import pandas as pd
df = pd.read_csv('path_to_read_csv_file')
redshift_conf = {
"user": "username",
"password": "password",
"host": "hostname",
"port": port number in integer,
"dbname": "dbname",
}
aws_conf = {
"aws_access_key_id": "<access_key>",
"aws_secret_access_key": "<secret_key>",
# "aws_session_token": "temporary-token-if-you-have-one",
}
rp = RedPanda(redshift_conf, aws_conf)
s3_bucket = "bucketname"
s3_path = "subfolder if any" # optional, if you don't have any sub folders
s3_file_name = "filename" # optional, randomly generated if not provided
rp.df_to_redshift(df, "table_name", bucket=s3_bucket, path=s3_path, append=False)
有关更多信息,请查看 github here 上的包
【讨论】:
我尝试使用 pandas df.to_sql(),但速度非常慢。我花了 10 多分钟才插入 50 行。请参阅this 未解决问题(截至撰写时)
我尝试使用 blaze 生态系统中的 odo(根据问题讨论中的建议),但遇到了 ProgrammingError,我没有费心去调查。
终于奏效了:
import psycopg2
# Fill in the blanks for the conn object
conn = psycopg2.connect(user = 'user',
password = 'password',
host = 'host',
dbname = 'db',
port = 666)
cursor = conn.cursor()
# Adjust ... according to number of columns
args_str = b','.join(cursor.mogrify("(%s,%s,...)", x) for x in tuple(map(tuple,np_data)))
cursor.execute("insert into table (a,b,...) VALUES "+args_str.decode("utf-8"))
cursor.close()
conn.commit()
conn.close()
是的,很老的psycopg2。这是一个 numpy 数组,但从 df 转换为 ndarray 应该不会太难。这给了我大约 3k 行/分钟。
但是,根据其他队友的建议,最快的解决方案是在将数据帧作为 TSV/CSV 转储到 S3 集群中然后复制之后使用 COPY 命令。如果您要复制非常庞大的数据集,您应该对此进行调查。 (如果我尝试了,我会在这里更新)
【讨论】:
args_str 和cursor.execute 两行中的... 位置需要放置什么吗?
(%s,%s) 和 cursor.execute 将是 (a,b) 假设您的列被命名为 a 和 b。
我以前依赖pandasto_sql()函数,但是太慢了。我最近转而做以下事情:
import pandas as pd
import s3fs # great module which allows you to read/write to s3 easily
import sqlalchemy
df = pd.DataFrame([{'A': 'foo', 'B': 'green', 'C': 11},{'A':'bar', 'B':'blue', 'C': 20}])
s3 = s3fs.S3FileSystem(anon=False)
filename = 'my_s3_bucket_name/file.csv'
with s3.open(filename, 'w') as f:
df.to_csv(f, index=False, header=False)
con = sqlalchemy.create_engine('postgresql://username:password@yoururl.com:5439/yourdatabase')
# make sure the schema for mytable exists
# if you need to delete the table but not the schema leave DELETE mytable
# if you want to only append, I think just removing the DELETE mytable would work
con.execute("""
DELETE mytable;
COPY mytable
from 's3://%s'
iam_role 'arn:aws:iam::xxxx:role/role_name'
csv;""" % filename)
该角色必须允许 Redshift 访问 S3,有关详细信息,请参阅 here
我发现对于一个 300KB 的文件(12000x2 数据帧),这需要 4 秒,而我使用 pandas to_sql() 函数需要 8 分钟
【讨论】:
您可以使用to_sql 将数据推送到 Redshift 数据库。我已经能够通过 SQLAlchemy 引擎连接到我的数据库来做到这一点。请务必在您的to_sql 调用中设置index = False。如果该表不存在,则将创建该表,您可以指定是否要调用以替换该表、追加到该表,或者如果该表已存在则失败。
from sqlalchemy import create_engine
import pandas as pd
conn = create_engine('postgresql://username:password@yoururl.com:5439/yourdatabase')
df = pd.DataFrame([{'A': 'foo', 'B': 'green', 'C': 11},{'A':'bar', 'B':'blue', 'C': 20}])
df.to_sql('your_table', conn, index=False, if_exists='replace')
请注意,您可能需要pip install psycopg2 才能通过 SQLAlchemy 连接到 Redshift。
【讨论】:
if_exists='replace' 是否适合您?它对我没有任何作用
to_sql 方法是否利用了 Redshift 的 MPP 架构?我注意到复制 22K 行的 DF 需要一些时间
import pandas_redshift as pr
pr.connect_to_redshift(dbname = <dbname>,
host = <host>,
port = <port>,
user = <user>,
password = <password>)
pr.connect_to_s3(aws_access_key_id = <aws_access_key_id>,
aws_secret_access_key = <aws_secret_access_key>,
bucket = <bucket>,
subdirectory = <subdirectory>)
# Write the DataFrame to S3 and then to redshift
pr.pandas_to_redshift(data_frame = data_frame,
redshift_table_name = 'gawronski.nba_shots_log')
【讨论】:
假设您可以访问 S3,这种方法应该可行:
第 1 步:将 DataFrame 作为 csv 写入 S3(我为此使用 AWS SDK boto3)
第 2 步:您从 DataFrame 中知道 Redshift 表的列、数据类型和键/索引,因此您应该能够生成 create table 脚本并将其推送到 Redshift 以创建一个空表
第 3 步:从 Python 环境向 Redshift 发送 copy 命令,将数据从 S3 复制到第 2 步中创建的空表中
每次都像魅力一样工作。
第 4 步:在您的云存储人员开始对您大喊大叫之前,请从 S3 中删除 csv
如果您看到自己多次这样做,将所有四个步骤包装在一个函数中可以保持整洁。
【讨论】:
就本次谈话而言,Postgres = RedShift 你有两个选择:
选项 1:
来自熊猫: http://pandas.pydata.org/pandas-docs/stable/io.html#io-sql
pandas.io.sql 模块提供了一组查询包装器,以方便数据检索并减少对特定于 DB 的 API 的依赖。数据库抽象由 SQLAlchemy 提供(如果已安装)。此外,您的数据库还需要一个驱动程序库。此类驱动程序的示例是用于 PostgreSQL 的 psycopg2 或用于 MySQL 的 pymysql。
编写数据帧
假设以下数据在DataFrame数据中,我们可以使用to_sql()将其插入到数据库中。
id Date Col_1 Col_2 Col_3
26 2012-10-18 X 25.7 True
42 2012-10-19 Y -12.4 False
63 2012-10-20 Z 5.73 True
In [437]: data.to_sql('data', engine)
对于某些数据库,写入大型 DataFrame 可能会由于超出数据包大小限制而导致错误。这可以通过在调用 to_sql 时设置 chunksize 参数来避免。例如,以下内容一次将数据分批写入数据库,每行 1000 行:
In [438]: data.to_sql('data_chunked', engine, chunksize=1000)
选项 2
或者你可以自己做 如果您有一个名为 data 的数据框,只需使用 iterrows 循环遍历它:
for row in data.iterrows():
然后将每一行添加到您的数据库中。我会为每一行使用复制而不是插入,因为它会更快。
http://initd.org/psycopg/docs/usage.html#using-copy-to-and-copy-from
【讨论】: