【问题标题】:Dependency error messages when running luigi pipeline运行 luigi 管道时的依赖错误消息
【发布时间】:2019-09-29 20:32:26
【问题描述】:

我正在尝试创建一个管道,该管道从一个类开始,该类根据用户所处的状态将一个文件分成多个 csv,然后查看创建的代表不同状态的文件并尝试确定用户是否从如果用户这样做,则一个状态到另一个返回 1,如果他/她没有,则返回 0,使用高斯 kde 拟合这些“概率”,将其保存为泡菜,然后从泡菜中获取样本并将它们保存为 csvs。

我正在使用 luigi 构建此管道,但在尝试运行我的代码时不断遇到错误消息。运行 state_to_state 类时,管道似乎失败了。

这是我写的代码:

separate_csv.py:

import luigi
import pandas as pd
import numpy as np
import os
import state_to_state_transitions2 as sst
class data_filter(luigi.Task):
    file = pd.read_csv('/Users/emmanuels/Desktop/Attribution/finalcleanattributiondata.csv')
    actions = file.state.unique()
    def run(self):
        for current in self.actions:
            filter_file = self.file.loc[self.file.state.str.contains(current,na=False)]
            filter_file.to_csv('/Users/emmanuels/Documents/AttributionData/Data/'+str(current)+'.csv')
    def requires(self):
        return []
    def output(self):
        return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/'+str(self.actions)+'.csv')

state_to_state_transitions2;

import luigi
import pandas as pd
import separate_csv
class state_to_state(luigi.Task):
    first_file = luigi.Parameter()
    second_file = luigi.Parameter()
    def run(self):
        #iterate through states and find probability of anonymous id existing in next state
        first = pd.read_csv(self.first_file)
        second = pd.read_csv(self.second_file)
        first['probability'] = first.anonymous_id.isin(second.anonymous_id).astype(int)
        #save anonymous id along with probability (1,0) of whether or not it exists in the next state
        first[['anonymous_id','probability']].to_csv('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.first_file.split('/')[6][:-4]+'to'+self.second_file.split('/')[7][:-4]+'.csv'))
    def requires(self):
        return separate_csv.data_filter()
    def output(self):
        return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.first_file.split('/')[6][:-4]+'to'+self.second_file.split('/')[6][:-4]+'.csv'))

gaussian_kdefit;

import pandas as pd
import pickle
from scipy import stats
import luigi
import state_to_state_transitions2 as sst
class save_distributions(luigi.Task):
    file_tag = luigi.Parameter()
    path = '/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'
    def run(self):
        data = pd.read_csv(path+self.file_tag)
        kernel = stats.gaussian_kde(data['probability'])
        #we fit the distribution and save as a pickle
        pickle.dump(kernel,open('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+'probabs'+'.pck','wb'))
    def requires(self):
        files = ['Session.csv','lead.csv','opportunity.csv','complete.csv']
        task_list = []
        for i in range(1,len(files)):
            one = self.path+str(files[i-1])
            two = self.path+str(files[i])
            task_list.append(sst.state_to_state(first_file=one,second_file=two))
        return task_list
    def output(self):
        return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+'probabs'+'.pck')

get_samples:

import pandas as pd
import luigi
import gaussian_kdefit as gkde
#takes n samples and saves sample in csv
class sample_output(luigi.Task):
    file_tag = luigi.Parameter()
    size = luigi.Parameter()
    def run(self):
        kernel = pd.read_pickle('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+'probabs'+'.pck')
        kernel = kernel.resample(int(self.size))
        pd.DataFrame(kernel).transpose().to_csv('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+'+sampleprobabs'+'.csv')
    def requires(self):
        files = ['Sessiontolead.csv', 'leadtoopportunity.csv', 'opportunitytocomplete.csv']
        return [gkde.save_distributions(file_tag=file) for file in files]
    def output(self):
        return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+'+sampleprobabs'+'.csv')

还有我的包装类:

import get_samples as getsamps
import pandas as pd
import luigi
class wrapper(luigi.WrapperTask):
    def requires(self):
        file_tag = ['Sessiontolead', 'leadtoopportunity', 'opportunitytocomplete']
        task_list = []
        size = 10
        for i in range(0,len(file_tag)):
            for k in range(1,size):
                task_list.append(getsamps.sample_output(file_tag=file_tag[i],size=size))
        return task_list
    def run(self):
        print('Wrapper ran')
        pd.DataFrame().to_csv('/Users/emmanuels/Documents/AttributionData/Data/wrangler1.csv')
    def output(self):
        return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/wrangler1.csv')
if __name__ == '__main__':
    luigi.build([wrapper()],workers=8,local_scheduler=True)

以下是最终干净归因文件的示例:

{'Unnamed: 0': {0: 0, 1: 1, 2: 2, 3: 3, 4: 4},
 'uniques': {0: '2019-06-18 09:00:000000a6a0-00bc-475f-a9e5-9dcbb4309e78https://signup.yoc.com/signup/v1/https://signup.yoc.com/signup/v1/step/businessDetails/',
  1: '2019-06-18 09:00:000000a6a0-00bc-475f-a9e5-9dcbb4309e78https://signup.yoc.com/signup/v1/https://signup.yoc.com/signup/v1/step/businessDetails/',
  2: '2019-06-18 09:00:000000a6a0-00bc-475f-a9e5-9dcbb4309e78https://signup.yoc.com/signup/v1/https://signup.yoc.com/signup/v1/step/userDetails/',
  3: '2018-05-17 20:00:000000c924-5959-4e2d-8757-0d10f96ca462http://m.facebook.com/https://www.yoc.com/signup/',
  4: '2019-02-24 16:00:000002269a-1e39-4cdf-a43e-cecf0a277c1chttps://signup.yoc.com/continue/1551024250465-dfd0e1d5-b76a-4bfa-bc29-9fcf5ef6b91c'},
 'anonymous_id': {0: '0000a6a0-00bc-475f-a9e5-9dcbb4309e78',
  1: '0000a6a0-00bc-475f-a9e5-9dcbb4309e78',
  2: '0000a6a0-00bc-475f-a9e5-9dcbb4309e78',
  3: '0000c924-5959-4e2d-8757-0d10f96ca462',
  4: '0002269a-1e39-4cdf-a43e-cecf0a277c1c'},
 'user_id': {0: '1560849071242-a19cdf50-ceec-41a0-ab51-ba9a45c8cda9',
  1: '1560849071242-a19cdf50-ceec-41a0-ab51-ba9a45c8cda9',
  2: '1560849071242-a19cdf50-ceec-41a0-ab51-ba9a45c8cda9',
  3: nan,
  4: nan},
 'ts': {0: '2019-06-18 09:11:14.409000',
  1: '2019-06-18 09:11:15.028000',
  2: '2019-06-18 09:12:03.118000',
  3: '2018-05-17 20:31:32.203000',
  4: '2019-02-24 16:08:32.661000'},
 'url': {0: 'https://signup.yoc.com/signup/v1/step/businessDetails/',
  1: 'https://signup.yoc.com/signup/v1/step/businessDetails/',
  2: 'https://signup.yoc.com/signup/v1/step/userDetails/',
  3: 'https://www.yoc.com/signup/',
  4: 'https://signup.yoc.com/continue/1551024250465-dfd0e1d5-b76a-4bfa-bc29-9fcf5ef6b91c'},
 'path': {0: '/za/signup/v1/step/businessDetails/',
  1: '/za/signup/v1/step/businessDetails/',
  2: '/za/signup/v1/step/userDetails/',
  3: '/za/signup/',
  4: '/continue/1551024250465-dfd0e1d5-b76a-4bfa-bc29-9fcf5ef6b91c'},
 'referrer_domain': {0: 'signup.yoc.com',
  1: 'signup.yoc.com',
  2: 'signup.yoc.com',
  3: 'm.facebook.com',
  4: nan},
 'utm_campaign': {0: nan, 1: nan, 2: nan, 3: nan, 4: nan},
 'utm_content': {0: nan, 1: nan, 2: nan, 3: nan, 4: nan},
 'utm_medium': {0: nan, 1: nan, 2: nan, 3: nan, 4: nan},
 'utm_source': {0: nan, 1: nan, 2: nan, 3: 'facebook', 4: nan},
 'user_agent': {0: 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36',
  1: 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36',
  2: 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36',
  3: 'Mozilla/5.0 (Linux; Android 8.0.0; SM-G965F Build/R16NW; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/66.0.3359.158 Mobile Safari/537.36 [FB_IAB/FB4A;FBAV/172.0.0.66.93;]',
  4: 'Opera/9.80 (Android; Opera Mini/38.1.2254/131.123; U; en) Presto/2.12.423 Version/12.16'},
 'rank': {0: 1, 1: 2, 2: 3, 3: 1, 4: 1},
 'state': {0: 'lead',
  1: 'lead',
  2: 'opportunity',
  3: 'Session',
  4: 'opportunity'}}

这是我得到的回溯:

Scheduled 9 tasks of which:
* 1 ran successfully:
    - 1 data_filter(file=/Users/emmanuels/Desktop/Attribution/finalcleanattributiondata.csv)
* 1 failed:
    - 1 state_to_state(first_file=/Users/emmanuels/Documents/AttributionData/Data/Session.csv, second_file=/Users/emmanuels/Documents/AttributionData/Data/lead.csv)
* 7 were left pending, among these:
    * 7 had failed dependencies:
        - 3 sample_output(file_tag=Sessiontolead, size=10) ...
        - 3 save_distributions(file_tag=Sessiontolead.csv,leadtoopportunity.csv,opportunitytocomplete.csv)
        - 1 wrapper()

....

INFO: [pid 45306] Worker Worker(salt=271561701, workers=1, host=Emmanuels-MacBook-Pro.local, username=emmanuels, pid=45306) running   state_to_state(first_file=/Users/emmanuels/Documents/AttributionData/Data/Session.csv, second_file=/Users/emmanuels/Documents/AttributionData/Data/lead.csv)
WARNING: Using wildcards in path /Users/emmanuels/Documents/AttributionData/Data/['lead' 'opportunity' 'Session' 'complete'].csv might lead to processing of an incomplete dataset; override exists() to suppress the warning.
ERROR: [pid 45306] Worker Worker(salt=271561701, workers=1, host=Emmanuels-MacBook-Pro.local, username=emmanuels, pid=45306) failed    state_to_state(first_file=/Users/emmanuels/Documents/AttributionData/Data/Session.csv, second_file=/Users/emmanuels/Documents/AttributionData/Data/lead.csv)
Traceback (most recent call last):
  File "/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/worker.py", line 175, in run
    raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
RuntimeError: Unfulfilled dependency at run time: data_filter__Users_emmanuels_c87d333278
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   state_to_state__Users_emmanuels__Users_emmanuels_c95a12621e   has status   FAILED

【问题讨论】:

  • 您能提供您收到的实际错误吗?或者更好地描述它们?
  • @iHowell 我尝试添加有关我收到的错误消息的更多详细信息

标签: python luigi


【解决方案1】:

查看您的更新后,我注意到您的第一个任务实际上没有任何参数。你只有几个对象。你不应该在变量声明中运行pd.read_csv。相反,您应该在run 方法中拥有它(除非您需要基于数据的东西,然后也在requires 方法中读取它)。相反,将file 更改为具有default 值的luigi.Parameter。此外,很难说出self.actions 应该是什么。参数(和自变量)应该是原语或可序列化为原语。

此外,您有一个递归导入,这可能会把事情搞砸。在您的 separate_csv 中导入 state_to_state_transition2,反之亦然。

老实说,这里有很多东西,可能有很多东西。我将致力于简化您的工作流程并一次处理一个组件。此外,您可以使用luigi 的输入/输出方法来更好地沿管道传输数据。

上一个问题仍然可能使事情成为问题:

您的问题可能是您没有使用 Luigi 的内部原子文件系统。而不是在state_to_state_transision2中打开文件:

class state_to_state(luigi.Task):
    def run(self):
        ...
        first[['anonymous_id','probability']].to_csv(...)

您应该使用 luigi 的输出命令打开文件,而不是自己写入:

class state_to_state(luigi.Task):
    def run(self):
        ...
        with self.output().open('w') as out_csv:
            out_csv.write(first[['anonymous_id','probability']].to_csv())

通过不使用原子文件系统,即使您只是错误地写入文件,也会创建该文件。由于文件的存在,这会向 luigi 发出任务已完成的信号。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-08-16
    • 1970-01-01
    • 2023-04-05
    • 1970-01-01
    相关资源
    最近更新 更多