【问题标题】:Create sql table from dask dataframe using map_partitions and pd.df.to_sql使用 map_partitions 和 pd.df.to_sql 从 dask 数据帧创建 sql 表
【发布时间】:2019-01-24 10:45:03
【问题描述】:

Dask 没有像 pandas 那样的 df.to_sql() ,所以我试图复制该功能并使用 map_partitions 方法创建一个 sql 表。这是我的代码:

import dask.dataframe as dd
import pandas as pd
import sqlalchemy_utils as sqla_utils

db_url = 'my_db_url_connection'
conn = sqla.create_engine(db_url)

ddf = dd.read_csv('data/prod.csv')
meta=dict(ddf.dtypes)
ddf.map_partitions(lambda df: df.to_sql('table_name', db_url, if_exists='append',index=True), ddf, meta=meta)

这将返回我的 dask 数据框对象,但是当我查看我的 psql 服务器时,没有新表...这里出了什么问题?

更新 仍然无法使其正常工作,但由于独立问题。追问:duplicate key value violates unique constraint - postgres error when trying to create sql table from dask dataframe

【问题讨论】:

    标签: python postgresql pandas dask pandas-to-sql


    【解决方案1】:

    简单地说,您已经创建了一个数据框,它是要完成的工作的处方,但您还没有执行它。要执行,您需要在结果上调用.compute()

    请注意,这里的输出并不是真正的数据帧,每个分区的计算结果为None(因为to_sql 没有输出),所以用df.to_delayed 表达可能会更简洁,类似于

    dto_sql = dask.delayed(pd.DataFrame.to_sql)
    out = [dto_sql(d, 'table_name', db_url, if_exists='append', index=True)
           for d in ddf.to_delayed()]
    dask.compute(*out)
    

    还要注意,能否获得良好的并行性将取决于数据库驱动程序和数据系统本身。

    【讨论】:

    • 谢谢!你的最后一句话是一个很好的观点。你知道 postgresql 是否支持并行性吗?这样做有什么意义,或者只是调用 ddf.compute().to_sql(...) 更好?我希望 dask 能加快速度,因为转换为 sql 非常慢,但现在我发现这可能是不可能的。
    【解决方案2】:

    【讨论】:

    • 您的回答没有为这个问题提供任何新的解决方案。您应该考虑将您的答案作为评论添加到已接受的答案中。
    猜你喜欢
    • 2020-07-02
    • 1970-01-01
    • 2019-01-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多