【问题标题】:Using Prophet or Auto ARIMA with Ray将 Prophet 或 Auto ARIMA 与 Ray 结合使用
【发布时间】:2021-12-15 22:48:54
【问题描述】:

关于雷的一些事情我找不到明确的答案。 Ray 是一个用于数据处理和训练的分布式框架。为了使其以分布式方式工作,必须使用 Modin 或 Ray 支持的其他一些分布式数据分析工具,以便数据可以在整个集群上流动,但是如果我想使用像 Facebook 的 Prophet 或 ARIMA 这样的模型呢?熊猫数据框作为输入?当我使用 pandas 数据框作为模型函数的参数时,它会只在单个节点上工作,还是有可能的解决方法让它在集群上工作?

【问题讨论】:

    标签: python arima ray facebook-prophet modin


    【解决方案1】:

    Ray 能够使用 pandas 数据帧作为输入来训练模型!

    目前,ARIMA 需要做一些小工作,因为它通常在幕后使用 statsmodels 库。为了确保模型正确序列化,需要额外的 pickle 步骤。 Ray 可能会在未来消除对 pickle 工作的需要。

    查看泡菜解决方法的说明:https://alkaline-ml.com/pmdarima/1.0.0/serialization.html

    这里是 python 3.8 和 ray 1.8 的代码摘录。请注意,train_model() 和 inference_model() 函数的输入是 pandas 数据帧。额外的泡菜步骤嵌入在这些函数中。 https://github.com/christy/AnyscaleDemos/blob/main/forecasting_demos/nyctaxi_arima_simple.ipynb

    import ray
    import pandas as pd
    import pmdarima as pm
    from pmdarima.model_selection import train_test_split
    
    # read 8 months of clean, aggregated monthly taxi data
    filename = "https://github.com/christy/MachineLearningTools/blob/master/data/clean_taxi_monthly.parquet?raw=true"
    g_month = pd.read_parquet(filename) 
    
    # Define a train_model function, default train on 6 months, inference 2
    def train_model(theDF:pd.DataFrame, item_col:str
                    , item_value:str, target_col:str
                    , train_size:int=6) -> list:
    
        # split data into train/test
        train, test = train_test_split(theDF.loc[(theDF[item_col]==item_value), :], train_size=train_size)
        
        # train and fit auto.arima model
        model = pm.auto_arima(y=train[target_col]
                              ,X=train.loc[:, (train.columns!=target_col) 
                                              & (train.columns!=item_col)]
                             )
        # here is the extra pickle step to handle statsmodel objects
        return [train, test, pickle.dumps(model)]
    
    
    # Define inference_model function
    def inference_model(model_pickle:bytes, test:pd.DataFrame
                        , timestamp_col:str, item_col:str, target_col:str) -> pd.DataFrame:
    
        # unpickle the model
        model = pickle.loads(model_pickle)
        
        # inference on test data
        forecast = pd.DataFrame(model.predict(n_periods=test.shape[0]
                             , X=test.loc[:, (test.columns!=target_col) & (test.columns!=item_col)]
                             , index=test.index))
        
        return forecast
    
    
    # start-up ray on your laptop for testing purposes
    import ray
    NUM_CPU = 2
    ray.init(
        ignore_reinit_error=True
        , num_cpus = NUM_CPU
    )
    
    ###########
    # run your training as distributed jobs by using ray remote function calls
    ###########
        
    # Convert your regular python functions to ray remote functions
    train_model_remote = ray.remote(train_model).options(num_returns=3)  
    inference_model_remote = ray.remote(inference_model)
        
    # Train every model
    item_list = list(g_month['pulocationid'].unique())
    model = []
    train = []
    test = []
    
    for p,v in enumerate(item_list):
        # ray remote eval
        temp_train, temp_test, temp_model = \
            train_model_remote.remote(g_month
                                      , item_col='pulocationid', item_value=v
                                      , target_col='trip_quantity'
                                      , train_size=6)
        train.append(temp_train)
        test.append(temp_test)
        model.append(temp_model)
    
    # Inference every test dataset
    result=[]
    for p,v in enumerate(item_list):
        # ray remote eval
        result.append(inference_model_remote.remote(model[p], test[p]
                                                    , timestamp_col='pickup_monthly'
                                                    , item_col='pulocationid'
                                                    , target_col='trip_quantity'))
    
    # ray.get() means block until all objectIDs requested are available
    forecast = ray.get(result)
    

    【讨论】:

    猜你喜欢
    • 2020-09-17
    • 2020-11-01
    • 1970-01-01
    • 2011-10-30
    • 1970-01-01
    • 1970-01-01
    • 2023-02-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多