【问题标题】:Recurrent machine learning ETL using Luigi使用 Luigi 的递归机器学习 ETL
【发布时间】:2017-05-04 04:30:07
【问题描述】:

今天,运行我编写的机器学习作业是手工完成的。我下载所需的输入文件,学习和预测事物,输出 .csv 文件,然后将其复制到数据库中。

但是,由于这是投入生产,我需要自动化所有这些过程。所需的输入文件将每月(最终更频繁地)到达提供商的 S3 存储桶中。

现在我正计划使用 Luigi 来解决这个问题。这是理想的过程:

  • 每周(或每天或每小时,只要我觉得更好)我都需要我的程序来查看 S3 存储桶中的新文件
  • 当文件到达时,我的机器学习管道被触发,并吐出一些 pandas 数据帧。
  • 之后,我需要我的程序将这些结果写入不同的数据库中

问题是,我不知道如何使用 Luigi 来自动化:

  1. 文件查看
  2. 计划任务(例如每个月)
  3. 部署它(以可重现的方式)

今天,这是我想到的管道骨架:

import luigi

from mylib import ml_algorithm
from mytools import read_s3, write_hdfs, read_hdfs, write_db, new_files, mark_as_done

class Extract(luigi.Task):
    date = luigi.DateParameter()
    s3_path = luigi.Parameter()
    filename = luigi.Parameter()
    def requires(self):
        pass
    def output(self, filename):
        luigi.hdfs.HdfsTarget(self.date.strftime('data/%Y_%m_%d' + self.filename)
    def run(self):
        data = read_s3(s3_path + '/' + file)
        with self.output.open('w') as hdfs_file:
            write_hdfs(hdfs_file, data)


class Transform(luigi.Task):
    date = luigi.DateParameter()
    s3_path = luigi.Parameter()
    filename = luigi.Parameter()
    def requires(self):
        return Extract(self.date, self.s3_path, self.filename)
    def output(self, filename):
        luigi.hdfs.HdfsTarget(self.date.strftime('data/results/%Y_%m_%d_' + filename)
    def run(self):
        with self.input().open('r') as inputfile:
            data = read_hdfs(inputfile)
        result = ml_algorithm(data)
        with self.output().open('w') as outputfile:
            write_hdfs(outputfile, result)
        mark_as_done(filename)



class Load(luigi.Task):
    date = luigi.DateParameter()
    s3_path = luigi.Parameter()
    def requires(self):
        return [Transform(self.date, self.s3_path, filename) for filename in new_files(self.s3_path)]
    def output(self):
        # Fake DB target, just for illustrative purpose
        luigi.hdfs.DBTarget('...')
    def run(self):
        for input in self.input():
            with input.open('r') as inputfile:
                result = read_hdfs(inputfile)
            # again, just for didatic purposes
            db = self.output().connection
            write_db(db, result)

然后我会将它添加到 crontab 并简单地包装到 Docker 容器中。

问题:

  • 这是人们用来执行此操作的正确模式吗?有更好的方法吗?
  • 如果我有Transform1(取决于输入数据)和Transform2(取决于Transform1 结果)并想将这两个结果保存到不同的数据库中,如何使用 Luigi 管道来实现这一点(也在这种看文件的情况下)?
  • 人们是否为此使用了不同于 cron 的东西?
  • 如何正确容器化?

【问题讨论】:

  • 澄清一下,多步 ETL 的一个例子是:输入数据是一个用户表,每个用户都有一些特征,第一个转换可以填充包含缺失值的列,第二个转换转换可以是该用户的聚类。我想保存填充的表格和集群。

标签: python amazon-s3 machine-learning etl luigi


【解决方案1】:

您的模式看起来基本正确。我将首先使用 cron 作业调用触发Load 任务管道的脚本。看起来这个Load 任务已经验证了 S3 存储桶中是否存在新文件,但是您必须将输出更改为也是有条件的,如果无事可做,它可能是状态文件或其他文件。您也可以在更高级别的WrapperTask(没有输出)中执行此操作,仅当有新文件时才需要Load 任务。然后你可以使用这个WrapperTask 来要求两个不同的加载任务,它们分别需要你的Transform1Transform2

添加容器...我的 cron 真正调用的是一个脚本,它从 git 中提取我的最新代码,必要时构建一个新容器,然后调用 docker run。我有另一个容器一直在运行luigid。每日 docker run 使用CMD 在容器中执行一个 shell 脚本,该脚本使用当天所需的参数调用 luigi 任务。

【讨论】:

  • 但是如果 Transform1 依赖于 Transform2 呢?这样我就不能使用单个包装任务来调用两者,因为它们是依赖的。我也不明白如何有条件地更改输出。
  • 您可以将 Transform2 添加到 Transform1 的 requires() 中。即使 WrapperTask 两者都需要,luigi 也会找出正确的图表。使输出有条件有点麻烦,最好只使用 WrapperTask。
猜你喜欢
  • 2019-05-03
  • 1970-01-01
  • 2021-09-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-02-09
  • 2017-06-25
  • 2015-07-07
相关资源
最近更新 更多