【问题标题】:piping postgres COPY in python with psycopg2使用 psycopg2 在 python 中管道 postgres COPY
【发布时间】:2011-10-09 13:38:09
【问题描述】:

我正在编写一个脚本来使用 psycopg2 在同一网络上的两台机器之间复制一些数据。我正在替换一些旧的、丑陋的 bash 来进行复制

psql -c -h remote.host "COPY table TO STDOUT" | psql -c "COPY table FROM STDIN"

这似乎是最简单和most efficient 的复制方式。使用 stringIO 或临时文件在 python 中复制很容易,如下所示:

buf = StringIO()

from_curs   = from_conn.cursor()
to_curs     = to_conn.cursor()

from_curs.copy_expert("COPY table TO STDOUT", buf)
buf.seek(0, os.SEEK_SET)
to_curs.copy_expert("COPY table FROM STDIN", buf)

...但这涉及将所有数据保存到磁盘/内存中。

有没有人想出一种方法来在这样的副本中模仿 Unix 管道的行为?我似乎找不到不涉及 POpen 的 unix-pipe 对象 - 毕竟,也许最好的解决方案是只使用 POpen 和子进程。

【问题讨论】:

  • 好奇以下解决方案是否有效?

标签: python sql copy pipe psycopg2


【解决方案1】:

您必须将其中一个调用放在单独的线程中。我刚刚意识到您可以使用os.pipe(),这使得其余部分变得非常简单:

#!/usr/bin/python
import psycopg2
import os
import threading

fromdb = psycopg2.connect("dbname=from_db")
todb = psycopg2.connect("dbname=to_db")

r_fd, w_fd = os.pipe()

def copy_from():
    cur = todb.cursor()
    cur.copy_from(os.fdopen(r_fd), 'table')
    cur.close()
    todb.commit()

to_thread = threading.Thread(target=copy_from)
to_thread.start()

cur = fromdb.cursor()
write_f = os.fdopen(w_fd, 'w')
cur.copy_to(write_f, 'table')
write_f.close()   # or deadlock...

to_thread.join()

【讨论】:

  • 这是一个很好的解决方案!我很好奇为什么有必要引入一个 Thread 对象?
  • @Demitri、copy_from()copy_to() 是阻塞命令;他们在操作完成之前不会返回。如果我们在主线程中进行第一次调用,它只会等待管道上的数据,而我们将永远无法获得控制权来进行另一个调用。
  • 啊,我明白了。它仍然会阻塞新线程,但允许主线程在闲暇时为管道提供数据。谢谢。
  • 这可以用来将 SQL 命令通过管道/流式传输到 psycopg2,而不是一次性缓冲所有 SQL 命令吗?我有一种情况,raster2pgsql 流出大量 SQL 命令,这意味着要通过管道传输到 psql,或者在我的情况下是 psycopg2。在这种情况下,我有一个来自子进程调用的 STDOUT 流,它将输出 SQL 命令。
【解决方案2】:

您可以使用已子类化的双端队列来支持读写:

from collections import deque
from Exceptions import IndexError

class DequeBuffer(deque):
    def write(self, data):
        self.append(data)
    def read(self):
        try:
            return self.popleft()
        except IndexError:
            return ''

buf = DequeBuffer()

如果读取器比写入器快得多,并且表很大,deque 仍然会变大,但会比存储整个数据小。

另外,我不确定return '' 何时deque 为空是安全的,而不是重试直到它不为空,但我猜它是。让我知道它是否有效。

当您确定复制已完成时,请记住 del buf,尤其是当脚本此时不只是退出时。

【讨论】:

    猜你喜欢
    • 2018-07-09
    • 1970-01-01
    • 1970-01-01
    • 2015-07-10
    • 2011-03-30
    • 2010-12-24
    • 2014-05-09
    • 1970-01-01
    • 2012-05-22
    相关资源
    最近更新 更多