【发布时间】:2015-04-28 12:34:04
【问题描述】:
我们有大约 1500 个 sqlite db,每个表中有 0 到 20,000,000 条记录(违规),违规记录总数约为 90,000,000。
我们通过在 1500 台服务器上运行爬虫程序生成的每个文件。有了这个违规表,我们还有一些其他的表可以用于进一步分析。
为了分析结果,我们将所有这些 sqlite 违规记录推送到 postgres 违规表中,以及其他插入和其他计算。
以下是我用来传输记录的代码,
class PolicyViolationService(object):
def __init__(self, pg_dao, crawler_dao_s):
self.pg_dao = pg_dao
self.crawler_dao_s = crawler_dao_s
self.user_violation_count = defaultdict(int)
self.analyzer_time_id = self.pg_dao.get_latest_analyzer_tracker()
def process(self):
"""
transfer policy violation record from crawler db to analyzer db
"""
for crawler_dao in self.crawler_dao_s:
violations = self.get_violations(crawler_dao.get_violations())
self.pg_dao.insert_rows(violations)
def get_violations(self, violation_records):
for violation in violation_records:
violation = dict(violation.items())
violation.pop('id')
self.user_violation_count[violation.get('user_id')] += 1
violation['analyzer_time_id'] = self.analyzer_time_id
yield PolicyViolation(**violation)
in sqlite dao
==============
def get_violations(self):
result_set = self.db.execute('select * from policyviolations;')
return result_set
in pg dao
=========
def insert_rows(self, rows):
self.session.add_all(rows)
self.session.commit()
此代码有效,但需要很长时间。解决这个问题的正确方法是什么。请建议,我们一直在讨论并行处理,跳过 sqlalchemy 和其他一些选项。请建议我们正确的方式。
提前致谢!
【问题讨论】:
-
如果您使用 Python 一次只记录一条记录,您可能可以通过将类似“转储”到文本文件并将其直接导入 Postgres 来获得很大的速度。
-
@JohnZwinck 谢谢。会试试的。
标签: python postgresql sqlite sqlalchemy