【问题标题】:Best way to move data between two databases using SQLAlchemy使用 SQLAlchemy 在两个数据库之间移动数据的最佳方法
【发布时间】:2022-01-10 12:37:42
【问题描述】:

(已回答)我对此的回答在下方,希望对您有所帮助。

我对 SQLAlchemy 和 Python 很陌生,我正在寻找一些建议。我正在考虑将数据从一个 Postgres DB 移动到另一个 Postgres DB。我要移动 2000 万多条记录,我希望这项工作每天都运行。我想知道:

  1. 我应该使用 SQLAlchemy 核心还是 ORM? (我主要使用核心 到目前为止)
  2. 我目前正在使用 SQLAlchemy 版本 '1.3.23'(我应该移动到 1.4/2.x)?
  3. 如何优化插入以更快地运行? (听说可能需要启用一些标志?)

很遗憾,我无法使用 pyscopg2 复制功能,因为我没有超级用户访问数据库的权限。

我正在尝试关注别人的堆栈溢出示例:the example i am following

q = select([table_1])

proxy = conn_p.execution_options(stream_results=True).execute(q)

while 'batch not empty':  # equivalent of 'while True', but clearer
    batch = proxy.fetchmany(100000)  # 100,000 rows at a time
    
    if not batch:
        break

    for row in batch:
        ???

proxy.close()

我卡住的部分是在 for 循环中。如何将数据写入下一个数据库? 我应该使用什么功能?

这是最好的方法还是我犯了严重的错误?

我当前使用 1.4 版的代码迭代:

conn_p = create_engine(--db connection string--, echo=True)

conn_sl = create_engine(--db connection string--, echo=False)

q = select([table_1])

proxy = conn_p.execution_options(stream_results=True).execute(q)


while 'batch not empty':      
    batch = proxy.fetchmany(10000)  
    
    list1 = []

    if not batch:
        break
    
    for row in batch:
        d = dict(row.items())
        list1.append(d)    
    
    insert_stmt = table_2.insert().values(list1) 
    conn_sl.execute(insert_stmt)    


proxy.close()

仍然很慢,移动 10k 条记录大约需要 15 秒。 有什么建议吗?

【问题讨论】:

  • @GordThompson 我已经像conn_sl = create_engine(--db connection string--, echo=False, executemany_mode='values') 那样将标志添加到我的连接中,但我没有看到任何性能增强。我错过了什么吗?我只做 INSERT 没有更新或类似的事情。我只是想移动数据。
  • values_only 是 SQLAlchemy 1.4 的默认值,听起来像你想要的
  • 我更新到 1.4 并运行了这段代码(添加到我上面的问题中)。我以相同的速度移动 10k 记录需要 15 秒,而 100k 需要 2 分半钟。
  • 您是通过 WAN 还是云连接将数据推送到目标数据库?

标签: python postgresql sqlalchemy


【解决方案1】:
  1. Postgres 不允许您同时访问同一会话中的两个数据库。有一些技巧,但如果你没有管理员权限,你可能无法使用它们。这意味着需要与同一主机进行第二次连接。将数据假脱机到客户端并返回到服务器比进行服务器端复制要昂贵得多,值得为 DBA 争取为定期操作设置某种计划复制。
  2. 像这样将 ORM 元素添加到批量操作只会减慢您的速度。对于数百万行,它可能会变得虚弱。
  3. 不确定,sqlalchemy 1.4+ 有很好的查询语法,但可能对这个特定问题没有帮助
  4. 如果直接使用 SQL,提示很简单。将索引延迟到加载数据之后。一旦添加了行,您可能必须删除索引并重新创建它。提防大型交易。见this discussion on bulk loading for more depth

因此,根据statement level docs,实例化第二个连接/会话,删除索引,使用每一行的属性执行基本 INSERT 语句,然后重新创建索引。

【讨论】:

  • 感谢您的反馈。所以我从一个 Postgresql DB 拉到另一个 Postgresql DB,所以我不能做服务器端复制。我一直在玩代码,并且在 for 循环中执行以下操作。 for row in batch: d = dict(row.items()) list1.append(d) insert_stmt = table_2.insert().values(list1) conn_sl.execute(insert_stmt) 请告诉我这是好是坏。
  • 在我使用过的服务器的单个插入语句中增加list1 的大小的回报会递减。在某些时候,解析 SQL 的服务器会成为瓶颈。根据服务器配置,将有一个最佳长度,例如每次插入 1000 个值,超过该值您会减慢或失败,而不是使用更小的负载发送更多查询。您必须对设置进行基准测试才能知道。
  • 使用上面的代码。加载 140k 条记录大约需要 160 秒。我正在使用 AWS sagemaker 运行该作业,它在 ml.t2.medium(4GB 内存) 上运行,我知道它的内存有限。我确实尝试过一个更大的实例 ml.t2.xlarge(16GB 内存),一次批处理大小为 10k 条记录,加载 140k 条记录需要 180 秒。
【解决方案2】:

经过大约一周的工作、测试和优化,我得到了以下结果。

我正在从事一些 ETL 工作,但主要是将数据从一个 postgresql 数据库提升和转移到另一个数据库。我需要它在我将表格传递给它的方式上是动态的,以便我可以更轻松地移动表格。请让我知道你的想法。

schema_s = "source schema"
schema_d = "destination schema"


source_creds = ('db connection string')
destination_creds = ('db connection string')

#same as destination - I use psycopg2 because it shaved off a few seconds off the insert time
conn_ssyc = psycopg2.connect('db connection string')
cur = conn_ssyc.cursor()


#list of tables to move
tbl_to_load = [
    'table_name_1',
    'table_name_2',
    'table_name_3',
    'table_name_4'
    ]

for current_table in tbl_to_load:
    start_time_per = time.time()
    
    #create the needed engines
    conn_s = create_engine(source_creds, echo=False, echo_pool=False)
    conn_d = create_engine(destination_creds, echo=False, echo_pool=False)
    
    #get the columns in each current_table 
    df = pd.read_sql(
                "SELECT \
                   column_name\
                FROM information_schema.columns\
                    WHERE table_schema = '{}'\
                    AND table_name   = '{}'".format(schema_d, current_table), conn_d)
    cur = conn_ssyc.cursor()
    print(current_table)
    
    
    #creates a list of "%s" corresponding to the number of columns in each table. 
    #Eg table with 5 columns ("%s","%s","%s","%s","%s")

    inst_col = "("
    for i in df['column_name']:
        inst_col = inst_col + "%s,"
    inst_col = inst_col[:-1]    #removes the last comma
    inst_col = inst_col + ")"

    #defining the tables
    meta_p = MetaData(schema = schema_s)
    table_1 = Table(current_table,
                    meta_p,
                    autoload_with=conn_s
                   )


    meta_sl = MetaData(schema = schema_d)
    table_2 = Table(current_table,
                    meta_sl,
                    autoload_with=conn_d
                    )
    
    #truncate current table
    truncate_query = sqlalchemy.text("TRUNCATE TABLE {}.{}".format(schema_d, current_table))
    conn_d.execution_options(autocommit=True).execute(truncate_query)

    start_time= time.time()
    
    #steam the data
    q = select([table_1])
    proxy = conn_s.execution_options(stream_results=True).execute(q)
    
    end_time = time.time()
    total_time = end_time - start_time
    print("Time: ", total_time)
    
    batch_number = 0
    while 'batch not empty':  
        batch = proxy.fetchmany(10000)  # fetching batches of 10,000 rows at a time
        
        start_time= time.time()
        #batch_number is just to monitor progress
        batch_number = batch_number + 1
        print(batch_number)
        
        list1 = []

        if not batch:
            break

        for row in batch:
            d = dict(row.items())
            list1.append(d)    

        df = pd.DataFrame(list1)    

        tpls = [tuple(x) for x in df.to_numpy()]
        
        #Here is my insert, so far based on the table size (52 columns wide, 4.3m records, and full data size in csv is 3.5gb) I can move 10k records in 3-5 seconds per batch
        
        args_str = ','.join(cur.mogrify(inst_col, x).decode('utf-8') for x in tpls)
        cur.execute("INSERT INTO {}.{} VALUES ".format(schema_d, current_table) + args_str)
        conn_ssyc.commit()
        #4-5 sec or so for 10k
        
        end_time = time.time()
        total_time = end_time - start_time
        print("Time: ", total_time)

    cur.close()
    proxy.close()
    
    end_time = time.time()
    total_time = end_time - start_time_per
    print("Time: ", total_time)

【讨论】:

    猜你喜欢
    • 2018-09-18
    • 1970-01-01
    • 2011-09-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-11-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多