【发布时间】:2021-11-23 13:13:41
【问题描述】:
假设我有一个task,它会下载给定日期的一些气象数据(仅出于本示例的目的)并将其保存在 CSV 文件中。
假设在第一次迭代中我只能从 API 下载该数据
class DownloadMetoDataTask(luigi.Task):
date = luigi.Parameter()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.downloader = MetoAPI()
def output(self):
return LocalTarget(f"meteo_data_{self.date}.csv")
def run(self):
data = self.downloader.get_data(self.date)
self.output().write(data)
后来,我们发现这些数据可以从本地存储/某些 DB/FTP 服务器或其他任何地方下载。
为了解决这个问题,我们可以:
- 继承自
DownloadMetoDataTask
class DownloadMetoDataTaskFromAPI(DownloadMetoDataTask):
#...
class DownloadMetoDataTaskFromDB(DownloadMetoDataTask):
#...
- 或通过将
self.downloader设置为可以在__init__方法中提供的实例属性来使用组合而不是继承
downloader 是一个用于下载气象数据的复杂对象。每个类都封装了具体的细节。
客户知道他们可以致电get_data(date) 并获得给定日期的气象数据。他们不担心如何检索数据(API、本地存储……)
class APIMetoDownloader:
def get_data(self, date):
# ...
class FTPMeteoDownloader:
def get_data(self, date)
# ....
我想选择 2,因为 1 会导致类增殖和复杂的类层次结构。
这是我尝试做的:
- 使用不是 Luigi 参数的实例属性
class MeteoDownloaderTask(luigi.Task):
date = luigi.Parameter()
def __init__(self, downloader, *args, **kwargs):
super().__init__(*args, **kwargs)
self.downloader = downloader
t = MeteoDownloaderTask(downloader= APIMetoDownloader(), date = "2021_11_15")
这失败了
---------------------------------------------------------------------------
UnknownParameterException Traceback (most recent call last)
<ipython-input-44-a0a196a3cb71> in <module>
----> 1 t = MeteoDownloaderTask(downloader= APIMetoDownloader(), date = "2021_11_15")
~/sandbox/luigi-sandbox/env/lib/python3.9/site-packages/luigi/task_register.py in __call__(cls, *args, **kwargs)
85
86 params = cls.get_params()
---> 87 param_values = cls.get_param_values(params, args, kwargs)
88
89 k = (cls, tuple(param_values))
~/sandbox/luigi-sandbox/env/lib/python3.9/site-packages/luigi/task.py in get_param_values(cls, params, args, kwargs)
410 raise parameter.DuplicateParameterException('%s: parameter %s was already set as a positional parameter' % (exc_desc, param_name))
411 if param_name not in params_dict:
--> 412 raise parameter.UnknownParameterException('%s: unknown parameter %s' % (exc_desc, param_name))
413 result[param_name] = params_dict[param_name].normalize(arg)
414
UnknownParameterException: MeteoDownloaderTask[args=(), kwargs={'downloader': <__main__.APIMetoDownloader object at 0x10ba3ca30>, 'date': '2021_11_15'}]: unknown parameter downloader
- 让下载器成为一个无关紧要的参数
class DownloadMetoDataTask(luigi.Task):
date = luigi.Parameter()
downloader = luigi.Parameter(significant=False)
t = DownloadMetoDataTask(downloader=APIDownloader(), date = "2021_11_15")
这是打印警告。此外,我发现添加一个“假”Luigi 参数只是为任务添加一个实例属性是令人困惑的。
/Users/mbo/sandbox/luigi-sandbox/env/lib/python3.9/site-packages/luigi/parameter.py:279: UserWarning: Parameter "downloader" with value "<__main__.APIDownloader object at 0x10aa52520>" is not of type string.
问题
- 对于解决方案 2:这对 Luigi 实例缓存有何影响? (对于具有相同重要参数集的
DownloadMetoDataTask的两个实例,task_id 相同) - 对于解决方案2:
self.downloader的接口有什么含义,它必须符合Parameter接口吗?
我还可以重新定义 Luigi.Task 的类方法(一个是 get_param_values )以仅获取定义为 Luigi 的参数并忽略常规实例属性。这样做是否安全/建议?
有没有更好的方法来实现这一点?使用必须注意 Luigi 任务参数的实例属性。
我不想做的事情
- 有一个假的
downloader字符串Luigi参数,可以用来获取下载器
downloaders_mapping = {"api": APIMetoDownloader(), "ftp": FTPMetoDownloader()}
class MeteoDownloaderTask(luigi.Task):
date = luigi.Parameter()
downloader_key = luigi.Parameter()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.downloader = downloaders_mapping[kwargs["self.downloader_key"]]
这增加了维护 downloaders_mapping 字典的额外开销
- 我不想为每个使用的下载器创建特定的类,因为这会导致类扩散和类的复杂长层次结构
【问题讨论】:
-
Luigi 在实例属性方面没有问题。问题是
downloader提供给__init__。 Luigi 任务旨在由用户调用。因此,如果用户不需要知道数据来自哪里,DownloadMetaDataTask不应该有downloader参数,run方法应该自己找出要附加的downloader。如果您将downloader设置得太早,您将在那里获得大量竞争条件。