【问题标题】:How to write data to Redshift that is a result of a dataframe created in Python?如何将作为在 Python 中创建的数据框的结果的数据写入 Redshift?
【发布时间】:2016-11-19 01:41:22
【问题描述】:

我在 Python 中有一个数据框。我可以将此数据作为新表写入 Redshift 吗? 我已成功创建到 Redshift 的数据库连接,并且能够执行简单的 sql 查询。 现在我需要给它写一个数据框。

【问题讨论】:

  • 也许你可以提供你目前必须的代码,以便于提供答案。

标签: python pandas dataframe amazon-redshift psycopg2


【解决方案1】:

鉴于所有答案都无法解决我的查询,所以我用谷歌搜索并得到了以下 sn-p,它在 2 分钟内完成了工作。我在 Windows 上使用 Python 3.8.5。

from red_panda import RedPanda
import pandas as pd
df = pd.read_csv('path_to_read_csv_file')
redshift_conf = {
    "user": "username",
    "password": "password",
    "host": "hostname",
    "port": port number in integer,
    "dbname": "dbname",
}

aws_conf = {
    "aws_access_key_id": "<access_key>",
    "aws_secret_access_key": "<secret_key>",
    # "aws_session_token": "temporary-token-if-you-have-one",
}

rp = RedPanda(redshift_conf, aws_conf)
s3_bucket = "bucketname"
s3_path = "subfolder if any" # optional, if you don't have any sub folders
s3_file_name = "filename" # optional, randomly generated if not provided
rp.df_to_redshift(df, "table_name", bucket=s3_bucket, path=s3_path, append=False)

有关更多信息,请查看 github here 上的包

【讨论】:

    【解决方案2】:

    我尝试使用 pandas df.to_sql(),但速度非常慢。我花了 10 多分钟才插入 50 行。请参阅this 未解决问题(截至撰写时)

    我尝试使用 blaze 生态系统中的 odo(根据问题讨论中的建议),但遇到了 ProgrammingError,我没有费心去调查。

    终于奏效了:

    import psycopg2
    
    # Fill in the blanks for the conn object
    conn = psycopg2.connect(user = 'user',
                                  password = 'password',
                                  host = 'host',
                                  dbname = 'db',
                                  port = 666)
    cursor = conn.cursor()
    
    # Adjust ... according to number of columns
    args_str = b','.join(cursor.mogrify("(%s,%s,...)", x) for x in tuple(map(tuple,np_data)))
    cursor.execute("insert into table (a,b,...) VALUES "+args_str.decode("utf-8"))
    
    cursor.close()
    conn.commit()
    conn.close()
    

    是的,很老的psycopg2。这是一个 numpy 数组,但从 df 转换为 ndarray 应该不会太难。这给了我大约 3k 行/分钟。

    但是,根据其他队友的建议,最快的解决方案是在将数据帧作为 TSV/CSV 转储到 S3 集群中然后复制之后使用 COPY 命令。如果您要复制非常庞大的数据集,您应该对此进行调查。 (如果我尝试了,我会在这里更新)

    【讨论】:

    • 这样插入10万行1分钟
    • 您能解释一下在args_strcursor.execute 两行中的... 位置需要放置什么吗?
    • 嗨@JonasPalačionis,这是您数据中列数的占位符。对于 2 列,它将是 (%s,%s)cursor.execute 将是 (a,b) 假设您的列被命名为 ab
    • 我在尝试这个 sol 时遇到这个错误:SyntaxError: syntax error at or near "table" LINE 1: insert into table (id,type,customer,customer_id,generation_d... ^
    【解决方案3】:

    我以前依赖pandasto_sql()函数,但是太慢了。我最近转而做以下事情:

    import pandas as pd
    import s3fs # great module which allows you to read/write to s3 easily
    import sqlalchemy
    
    df = pd.DataFrame([{'A': 'foo', 'B': 'green', 'C': 11},{'A':'bar', 'B':'blue', 'C': 20}])
    
    s3 = s3fs.S3FileSystem(anon=False)
    filename = 'my_s3_bucket_name/file.csv'
    with s3.open(filename, 'w') as f:
        df.to_csv(f, index=False, header=False)
    
    con = sqlalchemy.create_engine('postgresql://username:password@yoururl.com:5439/yourdatabase')
    # make sure the schema for mytable exists
    
    # if you need to delete the table but not the schema leave DELETE mytable
    # if you want to only append, I think just removing the DELETE mytable would work
    
    con.execute("""
        DELETE mytable;
        COPY mytable
        from 's3://%s'
        iam_role 'arn:aws:iam::xxxx:role/role_name'
        csv;""" % filename)
    
    

    该角色必须允许 Redshift 访问 S3,有关详细信息,请参阅 here

    我发现对于一个 300KB 的文件(12000x2 数据帧),这需要 4 秒,而我使用 pandas to_sql() 函数需要 8 分钟

    【讨论】:

      【解决方案4】:

      您可以使用to_sql 将数据推送到 Redshift 数据库。我已经能够通过 SQLAlchemy 引擎连接到我的数据库来做到这一点。请务必在您的to_sql 调用中设置index = False。如果该表不存在,则将创建该表,您可以指定是否要调用以替换该表、追加到该表,或者如果该表已存在则失败。

      from sqlalchemy import create_engine
      import pandas as pd
      
      conn = create_engine('postgresql://username:password@yoururl.com:5439/yourdatabase')
      
      df = pd.DataFrame([{'A': 'foo', 'B': 'green', 'C': 11},{'A':'bar', 'B':'blue', 'C': 20}])
      
      df.to_sql('your_table', conn, index=False, if_exists='replace')
      

      请注意,您可能需要pip install psycopg2 才能通过 SQLAlchemy 连接到 Redshift。

      to_sql Documentation

      【讨论】:

      • if_exists='replace' 是否适合您?它对我没有任何作用
      • 是的,如果表格已经存在,那么表格的内容将被替换为数据框的内容。
      • @Andrew,pandas 中的to_sql 方法是否利用了 Redshift 的 MPP 架构?我注意到复制 22K 行的 DF 需要一些时间
      • @lollerskates,不,据我所知没有。 Pandas 甚至不知道它正在与 Redshift 进行通信,因为您将连接/引擎对象传递给该方法。插入速度慢的一个可能原因是,pandas 在提交 SQL 命令时,并没有对 22K 条记录进行一次插入;它单独插入每条记录。我已经能够通过猴子修补熊猫来加快速度,以便它可以进行批量插入,如下所述:github.com/pandas-dev/pandas/issues/8953
      • @Andrew 我无法配置您的链接“postgresql://username:password@yoururl.com:5439/yourdatabase”您能否提供更多详细信息如何使用我的凭据替换每个元素?跨度>
      【解决方案5】:
      import pandas_redshift as pr
      
      pr.connect_to_redshift(dbname = <dbname>,
                              host = <host>,
                              port = <port>,
                              user = <user>,
                              password = <password>)
      
      pr.connect_to_s3(aws_access_key_id = <aws_access_key_id>,
                      aws_secret_access_key = <aws_secret_access_key>,
                      bucket = <bucket>,
                      subdirectory = <subdirectory>)
      
      # Write the DataFrame to S3 and then to redshift
      pr.pandas_to_redshift(data_frame = data_frame,
                              redshift_table_name = 'gawronski.nba_shots_log')
      

      详情:https://github.com/agawronski/pandas_redshift

      【讨论】:

      • 我收到此错误“当前事务已中止,命令被忽略,直到事务块结束”。对此有任何想法吗?
      • 我使用了 Red Panda 包并使用命令安装它:“pip install red-panda”。更适合我在 2 分钟内完成任务。
      【解决方案6】:

      假设您可以访问 S3,这种方法应该可行:

      第 1 步:将 DataFrame 作为 csv 写入 S3(我为此使用 AWS SDK boto3)
      第 2 步:您从 DataFrame 中知道 Redshift 表的列、数据类型和键/索引,因此您应该能够生成 create table 脚本并将其推送到 Redshift 以创建一个空表
      第 3 步:从 Python 环境向 Redshift 发送 copy 命令,将数据从 S3 复制到第 2 步中创建的空表中

      每次都像魅力一样工作。

      第 4 步:在您的云存储人员开始对您大喊大叫之前,请从 S3 中删除 csv

      如果您看到自己多次这样做,将所有四个步骤包装在一个函数中可以保持整洁。

      【讨论】:

      • 我使用相同的解决方案转储超过一百万行。我一次将它们分成 100k 行到一个 csv 文件中,然后使用 manifest 一次导入。唯一的问题是我无法压缩文件以加快复制速度。
      【解决方案7】:

      就本次谈话而言,Postgres = RedShift 你有两个选择:

      选项 1:

      来自熊猫: http://pandas.pydata.org/pandas-docs/stable/io.html#io-sql

      pandas.io.sql 模块提供了一组查询包装器,以方便数据检索并减少对特定于 DB 的 API 的依赖。数据库抽象由 SQLAlchemy 提供(如果已安装)。此外,您的数据库还需要一个驱动程序库。此类驱动程序的示例是用于 PostgreSQL 的 psycopg2 或用于 MySQL 的 pymysql。

      编写数据帧

      假设以下数据在DataFrame数据中,我们可以使用to_sql()将其插入到数据库中。

      id  Date    Col_1   Col_2   Col_3
      26  2012-10-18  X   25.7    True
      42  2012-10-19  Y   -12.4   False
      63  2012-10-20  Z   5.73    True
      
      In [437]: data.to_sql('data', engine)
      

      对于某些数据库,写入大型 DataFrame 可能会由于超出数据包大小限制而导致错误。这可以通过在调用 to_sql 时设置 chunksize 参数来避免。例如,以下内容一次将数据分批写入数据库,每行 1000 行:

      In [438]: data.to_sql('data_chunked', engine, chunksize=1000)
      

      选项 2

      或者你可以自己做 如果您有一个名为 data 的数据框,只需使用 iterrows 循环遍历它:

      for row in data.iterrows():
      

      然后将每一行添加到您的数据库中。我会为每一行使用复制而不是插入,因为它会更快。

      http://initd.org/psycopg/docs/usage.html#using-copy-to-and-copy-from

      【讨论】:

      • 感谢您的回答,但数据框会创建索引。 Redshift 不支持索引,所以不确定我们是否可以将数据作为数据帧写入 Redshift。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-28
      • 1970-01-01
      • 2023-03-09
      • 2019-01-22
      • 1970-01-01
      相关资源
      最近更新 更多