【问题标题】:How to use python to ETL between databases?如何使用python在数据库之间进行ETL?
【发布时间】:2016-01-19 22:12:42
【问题描述】:

使用 psycopg2,我可以从一个 PostgreSQL 数据库连接中的表中选择数据,并将其插入到第二个 PostgreSQL 数据库连接中的表中。

但是,我只能通过设置我想要提取的确切特征,并为我尝试插入的每一列写出单独的变量来做到这一点。

有没有人知道以下两种方法的良好做法:

  • 在数据库之间移动整个表,或
  • 迭代功能,而不必为要移动的每一列声明变量
  • 还是...?

这是我目前使用的脚本,您可以在其中查看特定功能的选择和变量的创建(它有效,但这不是一种实用的方法):

import psycopg2

connDev = psycopg2.connect("host=host1 dbname=dbname1 user=postgres password=*** ")
connQa = psycopg2.connect("host=host2 dbname=dbname2 user=postgres password=*** ")
curDev = connDev.cursor()
curQa = connQa.cursor()

sql = ('INSERT INTO "tempHoods" (nbhd_name, geom) values (%s, %s);')

curDev.execute('select cast(geom as varchar) from "CCD_Neighborhoods" where nbhd_id = 11;')
tempGeom = curDev.fetchone()

curDev.execute('select nbhd_name from "CCD_Neighborhoods" where nbhd_id = 11;')
tempName = curDev.fetchone()

data = (tempName, tempGeom)

curQa.execute (sql, data)


#commit transactions
connDev.commit()
connQa.commit()

#close connections
curDev.close()
curQa.close()
connDev.close()
connQa.close()

另一个注意事项是,python 允许显式使用 SQL 函数/数据类型转换,这对我们来说很重要,因为我们使用 GEOMETRY 数据类型。您可以在上面看到我将其转换为 TEXT,然后将其转储到源表中的现有几何列中 - 这将与 MSSQL Server 一起使用,这是地理空间社区中的一个巨大功能...

【问题讨论】:

  • 在 python 格式之间迁移数据有odo.readthedocs.org
  • 有什么原因你不想使用pg_dump 并且 使用python
  • 我认为您需要了解更多有关 psycopg 的信息。 psycopg 绝对 允许您在单个查询中返回多个列和行。看看some examples。如果您不喜欢位置元组,请参阅extras。如果您的 ETL 很简单并且您不需要强解耦或者这是一次性操作,请考虑使用外部数据包装器。 FDW 是一种 PostgreSQL 机制,可让您与远程数据库交互,甚至是非 PG 数据库。
  • @fahaddaniyal 我希望将其扩展到 MSSQL Server ... psycopg2 对 PostgreSQL 有好处,但如果 python 可以作为中介,那么有一个很好的 ETL 解决方案正在开发中......!

标签: python postgresql etl psycopg2


【解决方案1】:

在您的解决方案中(您的解决方案和您的问题的语句顺序不同)将以“sql =”开头的行和“#commit transactions”注释之前的循环更改为

sql_insert = 'INSERT INTO "tempHoods" (nbhd_id, nbhd_name, typology, notes, geom) values '
sql_values = ['(%s, %s, %s, %s, %s)']


data_values = []
# you can make this larger if you want
# ...try experimenting to see what works best
batch_size = 100 
sql_stmt = sql_insert + ','.join(sql_values*batch_size) + ';'
for i, row in enumerate(rows, 1):
    data_values += row[:5]
    if i % batch_size == 0:
        curQa.execute (sql_stmt , data_values )
        data_values = []
if (i % batch_size != 0):
    sql_stmt = sql_insert + ','.join(sql_values*(i % batch_size)) + ';'
    curQa.execute (sql_stmt , data_values )

顺便说一句,我认为你不需要承诺。您不开始任何交易。所以不应该有任何需要提交它们。当然,如果您所做的只是一堆选择,则不需要提交光标。

【讨论】:

  • 如此接近!我尝试在您的代码中工作,并被 pyCharm 告知要更改一些内容,但这是我迄今为止得到的 pastebin.com/R8GWCnvm 再次,据我所知,在 FOR 循环中没有做任何事情......非常感谢,德米特里!!!
  • 我一直在努力完成这项工作,虽然没有抛出任何错误,但没有任何内容提交到数据库。当我 print(sql_stmt, data_values) 我得到以下内容,我认为这是错误的...... );', [37.0, 'Hilltop', None, None]) (对于批次的第一次迭代...)
  • @mapBaker,如果您使用这些字符串参数(例如,%s 对应于37.0),您将得到与您拥有的版本相同的错误。如果你的数据实际上是一个浮点数,你应该使用%f。如果您使用%sNone 将作为None 插入到 Python 字符串中。我所做的只是将您的循环聚合到更大的插入语句中,以便运行更少的插入语句和更少的数据库往返。我没有对您尚未拥有的实际数据进行任何处理。
  • 其实,假设execute 做对了,传递None 可能就OK了。它应该被转换成NULL,但是当你指出应该期望一个字符串时传递浮点数......那个失败将完全取决于executepsycopg2中所做的事情......它可能会发生失败默默地。
  • 没有问题。但我想大部分功劳都归功于 Python 作为一种语言。我喜欢它很多年了。这并不是一个真正支持 Python 的论坛,但我要说的是,人们经常会惊讶于它可以如此紧凑和快速地完成工作。这幅漫画有很多来源,但这是我能找到的最快的:s-media-cache-ak0.pinimg.com/564x/d3/31/a2/…
【解决方案2】:

这是我基于 Dmitry 出色解决方案的更新代码:

import psycopg2

connDev = psycopg2.connect("host=host1 dbname=dpspgisdev user=postgres password=****")
connQa = psycopg2.connect("host=host2 dbname=dpspgisqa user=postgres password=****")
curDev = connDev.cursor()
curQa = connQa.cursor()

print "Truncating Source"
curQa.execute('delete from "tempHoods"')
connQa.commit()
#Get Data
curDev.execute('select  nbhd_id, nbhd_name, typology, notes, cast(geom as varchar) from "CCD_Neighborhoods";') #cast geom to varchar and insert into geometry column!
rows = curDev.fetchall()


sql_insert = 'INSERT INTO "tempHoods" (nbhd_id, nbhd_name, typology, notes, geom) values '
sql_values = ['(%s, %s, %s, %s, %s)'] #number of columns selecting / inserting


data_values = []

batch_size = 1000 #customize for size of tables... 

sql_stmt = sql_insert + ','.join(sql_values*batch_size) + ';'

for i, row in enumerate(rows, 1):

            data_values += row[:5] #relates to number of columns (%s)
            if i % batch_size == 0:
                curQa.execute (sql_stmt , data_values )
                connQa.commit()
                print "Inserting..."
                data_values = []

if (i % batch_size != 0):
    sql_stmt = sql_insert + ','.join(sql_values*(i % batch_size)) + ';'
    curQa.execute (sql_stmt, data_values)
    print "Last Values..."
    connQa.commit()





# close connections
curDev.close()
curQa.close()
connDev.close()
connQa.close()

【讨论】:

  • 如果你真的只有几行,当然可以。否则,我会为所有行使用 1 次插入。
  • @DmitryRubanovich 这是我整个过程的关键——如果你知道任何/可以指出一些如何做到这一点的例子,我会准备好的(我正在提高我的 python 技能在我去的时候,但在我到目前为止看到的任何例子中都没有遇到过......)谢谢!!!
  • 我的意思是使用插入多个值的sql语句。如“插入 TABLENAME (COLNAME1, COLNAME2, ..COLNAMEn) 值 (val, val, ..., val), (val, val, ..., val), (val, val, ..., val ), .... , (val, val, ..., val)”。您可以构造语句的开头,然后在循环中逐步建立值。并一次使用 100 或 1000(或任何您的测试显示效果最好的)值来执行它们。
  • @DmitryRubanovich 啊,这是一个很大的帮助......让我尝试一下并报告。再次感谢您!
  • @DmitryRubanovich 今天在这方面工作了太久,仍然回到我在上面的答案中发布的最后一个脚本......如果您对如何使用它有任何想法,我将不胜感激。 .. 停留在额外的 ' 和 [ 徘徊...再次,python 技能逐渐提高...感谢您可以添加的任何其他内容!
猜你喜欢
  • 2020-07-23
  • 1970-01-01
  • 2017-09-23
  • 1970-01-01
  • 1970-01-01
  • 2019-08-10
  • 1970-01-01
  • 1970-01-01
  • 2014-05-01
相关资源
最近更新 更多