【问题标题】:Pull large amounts of data from a remote server, into a DataFrame将大量数据从远程服务器拉入 DataFrame
【发布时间】:2014-10-27 08:02:21
【问题描述】:

为了提供尽可能多的上下文/需要,我正在尝试将存储在远程 postgres 服务器 (heroku) 上的一些数据提取到 pandas DataFrame 中,使用 psycopg2 进行连接。

我对两个特定的表感兴趣,usersevents,连接工作正常,因为当拉下用户数据时

import pandas.io.sql as sql 
# [...]
users = sql.read_sql("SELECT * FROM users", conn)

等待几秒钟后,DataFrame 按预期返回。

<class 'pandas.core.frame.DataFrame'>
Int64Index: 67458 entries, 0 to 67457
Data columns (total 35 columns): [...]

然而,当试图直接从 ipython 中提取更大、更重的 events 数据时,经过很长时间,它就崩溃了:

In [11]: events = sql.read_sql("SELECT * FROM events", conn)
vagrant@data-science-toolbox:~$

当从 iPython 笔记本尝试时,我得到 Dead kernel 错误

内核已经死了,要重新启动它吗?如果不重启内核,可以保存 notebook,但在重新打开 notebook 之前,运行代码将无法运行。


更新 #1:

为了更好地了解我试图引入的 events 表的大小,以下是记录数和每个表的属性数:

In [11]: sql.read_sql("SELECT count(*) FROM events", conn)
Out[11]:
     count
0  2711453

In [12]: len(sql.read_sql("SELECT * FROM events LIMIT 1", conn).columns)
Out[12]: 18

更新 #2:

内存绝对是read_sql当前实现的瓶颈:当拉下 events 并尝试运行另一个 iPython 实例时,结果是

vagrant@data-science-toolbox:~$ sudo ipython
-bash: fork: Cannot allocate memory

更新 #3:

我首先尝试了一个 read_sql_chunked 实现,它只返回部分 DataFrame 的数组:

def read_sql_chunked(query, conn, nrows, chunksize=1000):
    start = 0
    dfs = []
    while start < nrows:
        df = pd.read_sql("%s LIMIT %s OFFSET %s" % (query, chunksize, start), conn)
        start += chunksize
        dfs.append(df)
        print "Events added: %s to %s of %s" % (start-chunksize, start, nrows)
    # print "concatenating dfs"
    return dfs

event_dfs = read_sql_chunked("SELECT * FROM events", conn, events_count, 100000)

这很好用,但是当尝试连接 DataFrame 时,内核又死了。
这是在为 VM 提供 2GB RAM 之后。

基于 Andy 对 read_sqlread_csv 在实现和性能上的差异的解释,接下来我尝试将记录附加到 CSV 中,然后将它们全部读入 DataFrame:

event_dfs[0].to_csv(path+'new_events.csv', encoding='utf-8')

for df in event_dfs[1:]:
    df.to_csv(path+'new_events.csv', mode='a', header=False, encoding='utf-8')

再次,对 CSV 的写入成功完成 - 一个 657MB 的文件 - 但从 CSV 读取从未完成。

由于 2GB 似乎不够用,如何估计多少 RAM 才能读取 657MB 的 CSV 文件?


感觉我缺少对 DataFrames 或 psycopg2 的一些基本理解,但我被困住了,我什至无法确定瓶颈或优化的地方。

从远程(postgres)服务器提取大量数据的正确策略是什么?

【问题讨论】:

  • 作为一种体验,这很糟糕!希望我们将来可以为您服务。出于好奇,您的表有多大/有多少行?
  • @AndyHayden 已更新,为 events 表添加了记录数和每个记录的属性数。
  • 您是否需要内存中的所有数据?还是在 DataFrame 中同时只有部分数据(例如某些列)就足够了? (但除此之外,您关于数据框有多大的问题当然是合法的)
  • @joris 在这一点上,我同时使用了这两个:几个 DataFrame,其中 18 列的子集非常小,整个数据集分为 28 个部分 DataFrame。至少对于初步探索来说,拥有所有数据似乎是理想的。
  • 使用 HDF5 之类的东西(使用 pandas read_hdf/HDFStore)可以方便快速地按需查询数据子集,如果它太大而无法一次全部拉入内存(很多比 sql 快,并且可以查询子集而不是 csv)

标签: python postgresql pandas psycopg2


【解决方案1】:

这是一个可能有帮助的基本光标示例:

导入 psycopg2

请注意,我们必须导入 Psycopg2 extras 库!

导入 psycopg2.extras

导入系统

定义主(): conn_string = "host='localhost' dbname='my_database' user='postgres' 密码='secret'" ### 打印我们将用于连接的连接字符串

conn = psycopg2.connect(conn_string)

### HERE IS THE IMPORTANT PART, by specifying a name for the cursor
### psycopg2 creates a server-side cursor, which prevents all of the
### records from being downloaded at once from the server.
cursor = conn.cursor('cursor_unique_name', cursor_factory=psycopg2.extras.DictCursor)
cursor.execute('SELECT * FROM my_table LIMIT 1000')

### Because cursor objects are iterable we can just call 'for - in' on
### the cursor object and the cursor will automatically advance itself
### each iteration.
### This loop should run 1000 times, assuming there are at least 1000
### records in 'my_table'
row_count = 0
for row in cursor:
    row_count += 1
    print "row: %s    %s\n" % (row_count, row)

如果 name == "ma​​in": 主要()

【讨论】:

    【解决方案2】:

    尝试使用熊猫:

    mysql_cn = mysql.connector.connect(host='localhost', port=123, user='xyz',  passwd='****', db='xy_db')**
    
    data= pd.read_sql('SELECT * FROM table;', con=mysql_cn)
    
    mysql_cn.close()
    

    它对我有用。

    【讨论】:

      【解决方案3】:

      我怀疑这里有几个(相关的)事情在起作用,导致速度变慢:

      1. read_sql 是用 python 编写的,所以它有点慢(特别是与 read_csv 相比,它是用 cython 编写的 - 并且为了速度而精心实现!)并且它依赖于 sqlalchemy 而不是一些(可能更快)C-DBAPI。 迁移到 sqlalchmey 的动力是为了在未来更容易迁移(以及跨 sql 平台支持)。
      2. 您可能内存不足,因为内存中有太多 python 对象(这与未使用 C-DBAPI 有关),但可能会得到解决...

      我认为直接的解决方案是基于块的方法(并且有一个 feature request 可以在 pandas read_sqlread_sql_table 中原生地进行这项工作)。

      编辑:从 Pandas v0.16.2 开始,这种基于块的方法在 read_sql 中原生实现。


      由于您使用的是 postgres,因此您可以访问 LIMIT and OFFSET queries,这使得分块非常容易。 (我是否认为这些并非在所有 sql 语言中都可用?)

      首先,获取表中的行数(或estimate):

      nrows = con.execute('SELECT count(*) FROM users').fetchone()[0]  # also works with an sqlalchemy engine
      

      使用它来遍历表(为了调试,您可以添加一些打印语句以确认它正在工作/没有崩溃!)然后组合结果:

      def read_sql_chunked(query, con, nrows, chunksize=1000):
          start = 1
          dfs = []  # Note: could probably make this neater with a generator/for loop
          while start < nrows:
              df = pd.read_sql("%s LIMIT %s OFFSET %s" % (query, chunksize, start), con)
              dfs.append(df)
          return pd.concat(dfs, ignore_index=True)
      

      注意:这假设数据库适合内存!如果不是,您将需要处理每个块(mapreduce 样式)...或投资更多内存!

      【讨论】:

      • 内存可能是瓶颈:我正在运行一个只有默认 512M 的 VM。快速提升到 1024M,如果这不起作用,我会尝试分块读取。
      • @MariusButuc 让我知道这个解决方案的公平性/如果您有任何问题!
      • 添加了 Update #3 与我的新尝试(仍然不太成功)。
      • @MariusButuc 肯定会获得/分配更多的内存,2gb 并不是太多 imo - 它几乎肯定会在这里交换!您可以使用 pytables/HDF5 在磁盘上执行 concat ... 请参阅 pandas.pydata.org/pandas-docs/stable/io.html#table-format,但这可能还不够。
      • 它确实可以在 AWS m3.large 上运行... 7GB 的 RAM 是成功的。
      猜你喜欢
      • 1970-01-01
      • 2017-12-25
      • 1970-01-01
      • 1970-01-01
      • 2021-06-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-04-03
      相关资源
      最近更新 更多