【问题标题】:When a new file arrives in S3, trigger luigi task当新文件到达 S3 时,触发 luigi 任务
【发布时间】:2017-04-25 17:33:16
【问题描述】:

我有一个存储桶,其中包含根据创建时间随机添加键的新对象。例如:

's3://my-bucket/mass/%s/%s/%s/%s/%s_%s.csv' % (time.strftime('%Y'), time.strftime('%m'), time.strftime('%d'), time.strftime('%H'), name, the_time)

事实上,这些都是 Scrapy 爬取的输出。我想触发一个将这些爬网与我拥有的主 .csv 产品目录文件(称为“product_catalog.csv”)相匹配的任务,该文件也会定期更新。

现在,我用全局变量编写了几个 Python 脚本,每次运行此过程时都会填写这些变量。这些需要成为导入的属性。

所以这就是需要发生的事情:

1) 新的 csv 文件显示在“s3://my-bucket/mass/...”中,具有基于抓取完成时间的唯一键名。路易吉看到了这一点并开始了。
2) “cleaning.py”由 luigi 在新文件上运行,因此“cleaning.py”的参数(在 S3 中显示的文件)需要在运行时提供给它。除了传递到下一步之外,结果还会保存在 S3 中。
3) 从数据库中提取最新版本的“product_catalog.csv”,并在“matching.py”中使用“cleaning.py”的结果

我意识到这可能并不完全合理。我将根据需要提供编辑以使其更加清晰。

编辑

根据最初的答案,我已将其配置为一个拉动操作,可在此过程中节省步骤。但现在我很迷茫。应该注意的是,这是我第一次将 Python 项目捆绑在一起,所以我在做这件事时正在学习包括 init.py 之类的东西。像往常一样,这是一条崎岖不平的道路,成功后的兴奋紧随其后的是下一个障碍的困惑。

这是我的问题:
1)我不清楚如何从 Scrapy 导入蜘蛛。我有大约十几个,目标是让 luigi 管理所有这些的抓取>清理>匹配的过程。 Scrapy 文档说包括:

class MySpider(scrapy.Spider):
    # Your spider definition

这是什么意思?在控制蜘蛛的脚本中重新编写蜘蛛?这是没有意义的,他们的例子也没有帮助。

2) 我已将 Scrapy 管道配置为导出到 S3,但 luigi 似乎也使用 output() 来执行此操作。我应该使用哪个以及如何让它们一起玩?

3) Luigi 说 CrawlTask​​() 运行成功,但这是错误的,因为它在几秒钟内完成,而爬网通常需要几分钟。也没有对应成功的输出文件。

4) 我在哪里提供 S3 的凭据?

这是我的代码。我已经注释掉了那些不能代替我认为更好的东西。但我的感觉是,我想做的事情有一个宏伟的架构,我还不明白。

import luigi
from luigi.s3 import S3Target, S3Client
import my_matching
from datetime import datetime
import os
import scrapy
from twisted.internet import reactor
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from my_crawlers.my_crawlers.spiders import my_spider

class CrawlTask(luigi.Task):
    crawltime = datetime.now()
    spider = luigi.Parameter()
    #vertical = luigi.Parameter()

    def requires(self):
        pass

    def output(self):
        return luigi.LocalTarget("actual_data_staging/crawl_luigi_test_{}.csv".format(self.crawltime))
        #return S3Target("s3://my-bucket/mass/crawl_luigi_test_{}.csv".format(self.crawltime))

    def run(self):
        os.system("scrapy crawl %s" % self.spider)
        #process = CrawlerProcess(get_project_settings())
        #process.crawl("%s" % self.spider)
        #process.start()

class FetchPC(luigi.Task):
    vertical = luigi.Parameter()

    def output(self):
        if self.vertical == "product1":
            return "actual_data_staging/product1_catalog.csv"
        elif self.vertical == "product2":
            return "actual_data_staging/product2_catalog.csv"

class MatchTask(luigi.Task):
    crawltime = CrawlTask.crawltime
    vertical = luigi.Parameter()
    spider = luigi.Parameter()

    def requires(self):
        return CrawlTask(spider=self.spider)
        return FetchPC(vertical=self.vertical)

    def output(self):
        return luigi.LocalTarget("actual_data_staging/crawl_luigi_test_matched_{}.csv".format(self.crawltime))
        #return S3Target("s3://my-bucket/mass/crawl_luigi_test_matched_{}.csv".format(CrawlTask.crawltime))

    def run(self):
        if self.vertical == 'product1':
            switch_board(requires.CrawlTask(), requires.FetchPC())

MatchTask 指的是我编写的一个 python 脚本,它将抓取的产品与我的产品目录进行比较。它看起来像这样:

def create_search(value):
...
def clean_column(column):
...
def color_false_positive():
...
def switch_board(scrape, product_catalog):
# this function coordinates the whole script

【问题讨论】:

    标签: python amazon-s3 scrapy luigi


    【解决方案1】:

    下面是一个非常粗略的外观轮廓。我认为关于 luigi 作为拉系统工作的主要区别在于,您首先指定所需的输出,然后触发该输出所依赖的其他任务。因此,与其在爬网结束时命名事物,不如在开始时就知道的事物命名事物更容易。也可以用另一种方式来做,只是增加了很多不必要的复杂性。

    class CrawlTask(luigi.Task):
        crawltime = luigi.DateParameter()
    
        def requires(self):
            pass
    
        def get_filename(self):
            return "s3://my-bucket/crawl_{}.csv".format(self.crawltime)
    
        def output(self):
            return S3Target(self.get_filename())
    
        def run(self):
            perform_crawl(s3_filename=self.get_filename())
    
    
    class CleanTask(luigi.Task):
        crawltime = luigi.DateParameter()
    
        def requires(self):
            return CrawlTask(crawltime=self.crawltime)
    
        def get_filename(self):
            return "s3://my-bucket/clean_crawl_{}.csv".format(self.crawltime)
    
        def output(self):
            return S3Target(self.get_filename())
    
        def run(self):
            perform_clean(input_file=self.input().path, output_filename=self.get_filename())
    
    
    class MatchTask(luigi.Task):
        crawltime = luigi.DateParameter()
    
        def requires(self):
            return CleanTask(crawltime=self.crawltime)
    
        def output(self):
            return ##?? whatever output of this task is
    
        def run(self):
            perform_match(input_file=self.input().path)
    

    【讨论】:

    • 那么更好的思考方式是我想安排每 4 小时完成一次最终结果,这将导致 luigi 触发所有要求以交付结果,而不是安排每 4 小时抓取一次,并在抓取完成后触发其他进程?
    • 那么我在哪里引入实际执行这项工作的matching.py 和cleaning.py 脚本?以及如何将 luigi 正在处理的变量(如每个步骤的结果)输入到 match.py​​ 中?本质上,这些不同的 .py 脚本应该以使用 read_csv 导入的 .csv 文件的形式将 pandas 数据帧相互传递
    • 哦,我把 perform_cleanperform_match 等放在了哪里。我以为你会导入这些脚本,而那些是我编造的方法的假名。如果没有看到该代码,很难做更多的事情。
    • @thaneofcawdor :如果需要,您可以将这些功能重写/重构为scrapy任务。或将它们导入到您编写任务的 python 脚本中。就个人而言,我将所有核心逻辑都放在单独的类中,并在需要时在 luigi 任务的 run() 中导入和使用它们。
    【解决方案2】:

    您可以做的是创建一个更大的系统来封装您的抓取和处理。这样您就不必检查 s3 是否有新对象。我以前没用过 luigi,但也许你可以把你的scrapy 工作变成一个任务,完成后做你的处理任务。无论如何,我认为“检查” s3 是否有新东西不是一个好主意,因为 1. 您将不得不使用大量 API 调用,以及 2. 您需要编写一堆代码来检查某些东西是否“新” ' 与否,这可能会变得多毛。

    【讨论】:

    • 对于这个更大的系统,您有什么建议?一切都是用 Python 编写的,如果有帮助的话,我会将它们打包到 Docker 容器中。
    • 你不能使用 Luigi 任务吗?将数据管道中的每个步骤作为一项任务实施,并在您认为合适的情况下启动任务。我以前从未使用过 Luigi——我使用 celeryproject.org celery 来处理这些东西,但它也是一个用于定义和执行任务的系统。至于使用 docker,您应该仍然可以使用 docker 完成所有这些操作,但您可能需要使用 docker 网络,并正确配置您的容器。容器是动态分配的 IP,因此您可能需要某种方式来执行“服务发现”来查找 IP。我确信 docker 已经内置了工具来做到这一点。
    • 我可以让scrapy向luigi发送一个信号作为Task.requires对象,其中包括scrapy创建的s3键吗?然后 luigi 可以从 s3 中检索该密钥。这似乎很愚蠢,但我如何让 Python 代码(爬虫和 luigi)相互交谈并传递这些信息?
    • 再说一次,我以前从未使用过 Luigi。如果你将scrapy爬虫实现为一个任务,你能从任务中返回一条消息,然后启动一个新任务吗?通过查看文档,我看到了两种可能的实现方式。 Task.output 以及 @luigi.Task.event_handler 装饰器。
    猜你喜欢
    • 1970-01-01
    • 2020-10-05
    • 1970-01-01
    • 2019-03-15
    • 1970-01-01
    • 2016-02-08
    • 2018-09-30
    • 2015-05-01
    • 1970-01-01
    相关资源
    最近更新 更多