【发布时间】:2017-04-25 17:33:16
【问题描述】:
我有一个存储桶,其中包含根据创建时间随机添加键的新对象。例如:
's3://my-bucket/mass/%s/%s/%s/%s/%s_%s.csv' % (time.strftime('%Y'), time.strftime('%m'), time.strftime('%d'), time.strftime('%H'), name, the_time)
事实上,这些都是 Scrapy 爬取的输出。我想触发一个将这些爬网与我拥有的主 .csv 产品目录文件(称为“product_catalog.csv”)相匹配的任务,该文件也会定期更新。
现在,我用全局变量编写了几个 Python 脚本,每次运行此过程时都会填写这些变量。这些需要成为导入的属性。
所以这就是需要发生的事情:
1) 新的 csv 文件显示在“s3://my-bucket/mass/...”中,具有基于抓取完成时间的唯一键名。路易吉看到了这一点并开始了。
2) “cleaning.py”由 luigi 在新文件上运行,因此“cleaning.py”的参数(在 S3 中显示的文件)需要在运行时提供给它。除了传递到下一步之外,结果还会保存在 S3 中。
3) 从数据库中提取最新版本的“product_catalog.csv”,并在“matching.py”中使用“cleaning.py”的结果
我意识到这可能并不完全合理。我将根据需要提供编辑以使其更加清晰。
编辑
根据最初的答案,我已将其配置为一个拉动操作,可在此过程中节省步骤。但现在我很迷茫。应该注意的是,这是我第一次将 Python 项目捆绑在一起,所以我在做这件事时正在学习包括 init.py 之类的东西。像往常一样,这是一条崎岖不平的道路,成功后的兴奋紧随其后的是下一个障碍的困惑。
这是我的问题:
1)我不清楚如何从 Scrapy 导入蜘蛛。我有大约十几个,目标是让 luigi 管理所有这些的抓取>清理>匹配的过程。 Scrapy 文档说包括:
class MySpider(scrapy.Spider):
# Your spider definition
这是什么意思?在控制蜘蛛的脚本中重新编写蜘蛛?这是没有意义的,他们的例子也没有帮助。
2) 我已将 Scrapy 管道配置为导出到 S3,但 luigi 似乎也使用 output() 来执行此操作。我应该使用哪个以及如何让它们一起玩?
3) Luigi 说 CrawlTask() 运行成功,但这是错误的,因为它在几秒钟内完成,而爬网通常需要几分钟。也没有对应成功的输出文件。
4) 我在哪里提供 S3 的凭据?
这是我的代码。我已经注释掉了那些不能代替我认为更好的东西。但我的感觉是,我想做的事情有一个宏伟的架构,我还不明白。
import luigi
from luigi.s3 import S3Target, S3Client
import my_matching
from datetime import datetime
import os
import scrapy
from twisted.internet import reactor
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from my_crawlers.my_crawlers.spiders import my_spider
class CrawlTask(luigi.Task):
crawltime = datetime.now()
spider = luigi.Parameter()
#vertical = luigi.Parameter()
def requires(self):
pass
def output(self):
return luigi.LocalTarget("actual_data_staging/crawl_luigi_test_{}.csv".format(self.crawltime))
#return S3Target("s3://my-bucket/mass/crawl_luigi_test_{}.csv".format(self.crawltime))
def run(self):
os.system("scrapy crawl %s" % self.spider)
#process = CrawlerProcess(get_project_settings())
#process.crawl("%s" % self.spider)
#process.start()
class FetchPC(luigi.Task):
vertical = luigi.Parameter()
def output(self):
if self.vertical == "product1":
return "actual_data_staging/product1_catalog.csv"
elif self.vertical == "product2":
return "actual_data_staging/product2_catalog.csv"
class MatchTask(luigi.Task):
crawltime = CrawlTask.crawltime
vertical = luigi.Parameter()
spider = luigi.Parameter()
def requires(self):
return CrawlTask(spider=self.spider)
return FetchPC(vertical=self.vertical)
def output(self):
return luigi.LocalTarget("actual_data_staging/crawl_luigi_test_matched_{}.csv".format(self.crawltime))
#return S3Target("s3://my-bucket/mass/crawl_luigi_test_matched_{}.csv".format(CrawlTask.crawltime))
def run(self):
if self.vertical == 'product1':
switch_board(requires.CrawlTask(), requires.FetchPC())
MatchTask 指的是我编写的一个 python 脚本,它将抓取的产品与我的产品目录进行比较。它看起来像这样:
def create_search(value):
...
def clean_column(column):
...
def color_false_positive():
...
def switch_board(scrape, product_catalog):
# this function coordinates the whole script
【问题讨论】:
标签: python amazon-s3 scrapy luigi