【问题标题】:Luigi/SQLite: How to update database after initial load?Luigi/SQLite:初始加载后如何更新数据库?
【发布时间】:2021-02-03 14:59:49
【问题描述】:

我正在使用以下代码通过 Luigi 将数据加载到 SQLite 数据库中:

class LoadData(luigi.Task):   
    
    def requires(self):
        return TransformData()
        
    def run(self):
        with sqlite3.connect('database.db') as db:
            cursor = db.cursor()
            cursor.execute("INSERT INTO prod SELECT * FROM staging;")
    def output(self):
        return luigi.LocalTarget('database.db')

这可行,但是当我想更新或插入新数据时,任务不会执行,因为 Luigi 认为它已完成(database.db 已存在)

也许我不明白 LocalTarget 的好用处。解决这个问题的正确方法是什么?

///编辑:我的问题适用于this page 上给出的示例(le_create_db.py 的代码)。您如何解决该示例中的更新和插入问题?

///编辑:This question 关于附加到文件是类似的,但是使用标记文件的解决方案不起作用,因为 sqla 需要 SQLAlchemyTarget 输出。还有其他答案,特别是关于附加到数据库的答案吗?

【问题讨论】:

    标签: python sqlite etl luigi


    【解决方案1】:

    考虑使用模拟文件: http://gouthamanbalaraman.com/blog/building-luigi-task-pipeline.html

    在每次执行中,您都将创建一个新文件。

    另一种解决方案可能是使用在数据库中创建标记表的策略,例如:https://luigi.readthedocs.io/en/stable/api/luigi.contrib.postgres.html#luigi.contrib.postgres.PostgresTarget

    【讨论】:

    • 我读到在开发过程中建议使用模拟文件。在生产中使用它有什么缺点?
    • 让我们想象一下,您在生产中使用模拟文件有 3 个依赖任务。最后一个任务失败了,你需要从头开始。
    【解决方案2】:

    我遇到了同样的问题,并且能够通过覆盖 complete 方法以简单地返回 False 来解决它:

    def complete(self):
        return False
    

    现在任务每次都会重新运行,即使存在数据库文件。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-07-01
      • 1970-01-01
      • 2021-11-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多