【问题标题】:Running rpy2 in parallel using multiprocessing raises weird exception that cannot be caught使用多处理并行运行 rpy2 会引发无法捕获的奇怪异常
【发布时间】:2016-11-02 06:18:10
【问题描述】:

所以这是一个我无法解决的问题,我也不知道有什么好方法可以用来制作 MCVE。本质上,它已经被简要讨论过here,但正如 cmets 所显示的,存在一些分歧,最终判决仍未定论。因此,我再次发布类似的问题,希望得到更好的答案。

背景

我有来自几千个传感器的传感器数据,我每分钟都会收到这些数据。我的兴趣在于预测数据。为此,我使用 ARIMA 系列的预测模型。长话短说,在与我的研究小组的其他成员讨论后,我们决定使用 R 包 forecast 中提供的 Arima 函数,而不是相同的 statsmodels 实现。

问题定义

因为我有来自几千个传感器的数据,我想至少分析一整周的数据(开始),因为一周有 7 天,所以我有 7 倍的数量传感器数据与我。基本上是大约 14k 传感器日组合。对于每个传感器日组合,查找最佳 ARIMA 顺序(将 BIC 最小化)并预测一周中的下一天数据大约需要 1 分钟。这意味着在单核上处理一周的数据需要 11 天以上的时间!

这显然是一种浪费,因为我还有 15 个核心一直处于闲置状态。因此,显然,这是并行处理的问题。请注意,每个传感器日组合不会影响任何其他传感器日组合。此外,我的其余代码的配置和优化都相当完善。

问题

问题是我得到了这个我无法在任何地方发现的奇怪错误。这是重现的错误:

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/home/kartik/miniconda3/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/home/kartik/miniconda3/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/home/kartik/miniconda3/lib/python3.5/multiprocessing/pool.py", line 429, in _handle_results
    task = get()
  File "/home/kartik/miniconda3/lib/python3.5/multiprocessing/connection.py", line 251, in recv
    return ForkingPickler.loads(buf.getbuffer())
  File "/home/kartik/miniconda3/lib/python3.5/site-packages/rpy2/robjects/robject.py", line 55, in _reduce_robjectmixin
    rinterface_level=rinterface_factory(rdumps, rtypeof)
ValueError: Mismatch between the serialized object and the expected R type (expected 6 but got 24)

以下是我发现的此错误的一些特征:

  1. 它在rpy2 包中提出
  2. 它与线程 3 有关。由于 Python 是零索引的,我猜这是第四个线程。因此,4x6 = 24,加起来就是最终错误语句中显示的数字
  3. rpy2 仅在我的代码中的一个地方使用,它可能必须将返回的值重新编码为 Python 类型。在try: ... except: ... 中保护该行不会捕获该异常
  4. 当我放弃多处理并在循环中调用函数时不会引发异常
  5. 异常不会使程序崩溃,只是永远挂起它(直到我 Ctrl+C 终止它)
  6. 到目前为止我尝试的所有方法都没有解决错误

尝试过的事情

我已经尝试了从极端程序编码,处理最少情况的函数(即只有一个要并行调用的函数)到极端封装,其中if __name__ == '__main__': 中的可执行块调用一个函数读入数据,进行必要的分组,然后将组传递给另一个函数,该函数导入multiprocessing并并行调用另一个函数,该函数导入导入rpy2的处理模块,并将数据传递给Arima R中的函数。

基本上,rpy2 是否在函数嵌套的深处被调用和初始化并不重要,这样它不知道可能会初始化另一个实例,或者如果它被全局调用和初始化一次,则会引发错误如果涉及multiprocessing

伪代码

这里尝试提供至少一些基本的伪代码,以便可能重现错误。

import numpy as np
import pandas as pd

def arima_select(y, order):
    from rpy2 import robjects as ro
    from rpy2.robjects.packages import importr
    from rpy2.robjects import pandas2ri
    pandas2ri.activate()
    forecast = importr('forecast')

    res = forecast.Arima(y, order=ro.FloatVector(order))
    return res

def arima_wrapper(data):
    data = data[['tstamp', 'val']]
    data.set_index('tstamp', inplace=True)
    return arima_select(data, (1,1,1))

def applyParallel(groups, func):
    from multiprocessing import Pool, cpu_count
    with Pool(cpu_count()) as p:
        ret_list = p.map(func, [group for _, group in groups])
    return pd.concat(ret_list, keys=[name for name, _ in groups])

def wrapper():
    df = pd.read_csv('file.csv', parse_dates=[1], infer_datetime_format=True)
    df['day'] = df['tstamp'].dt.day
    res = applyParallel(df.groupby(['sensor', 'day']), arima_wrapper)
    print(res)

显然,上述代码可以进一步封装,但我认为它应该相当准确地重现错误。

数据样本

这是上面伪代码中arima_wrapper 中的data.set_index('tstamp', inplace=True) 紧邻print(data.head(6)) 的输出:

或者,一个传感器一整周的数据可以简单地生成:

def data_gen(start_day):
    r = pd.Series(pd.date_range('2016-09-{}'.format(str(start_day)),
                                periods=24*60, freq='T'),
                  name='tstamp')
    d = pd.Series(np.random.randint(10, 80, 1440), name='val')
    s = pd.Series(['sensor1']*1440, name='sensor')
    return pd.concat([s, r, d], axis=1)
df = pd.concat([data_gen(day) for day in range(1,8)], ignore_index=True)

观察和问题

第一个观察是这个错误只在涉及multiprocessing 时引发,而不是在循环调用函数(arima_wrapper)时引发。因此,它必须以某种方式与多处理问题相关联。 R 对多进程不是很友好,但是按照伪代码所示的方式编写时,R 的每个实例都不应该知道其他实例的存在。

伪代码的结构方式,在multiprocessing 产生的多个子进程内的每个调用都必须初始化rpy2。如果这是真的,rpy2 的每个实例都应该产生自己的 R 实例,它应该只执行一个函数,然后终止。这不会引发任何错误,因为它类似于单线程操作。我在这里的理解是否准确,还是我完全或部分没有抓住重点?

如果rpy2 的所有实例都以某种方式共享 R 的实例,那么我可能会合理地预期该错误。真实情况:R 是否在rpy2 的所有实例之间共享,或者rpy2 的每个实例都有一个 R 实例?

如何解决这个问题?

由于 SO 讨厌其中包含多个问题的问题线程,因此我将优先考虑我的问题,以便接受部分答案。这是我的优先级列表:

  1. 如何解决这个问题?未提出问题的工作代码示例将被接受为答案,即使它没有回答任何其他问题,只要没有其他答案更好,或者之前发布过。
  2. 我对 Python 导入的理解是否准确,还是我错过了关于 R 的多个实例的要点?如果我错了,我应该如何编辑导入语句以便在每个子流程中创建一个新实例?这个问题的答案可能会为我指明一个可能的解决方案,并且会被接受,前提是没有更好的答案,或者之前发布过
  3. R 是在rpy2 的所有实例之间共享,还是rpy2 的每个实例都有一个R 实例?此问题的答案只有在能够解决问题时才会被接受。

【问题讨论】:

    标签: python r multithreading parallel-processing rpy2


    【解决方案1】:

    (...) 长话短说 (...)

    真的吗?

    1. 如何解决这个问题?一个没有的工作代码示例 提出问题将被接受为答案,即使它没有回答 任何其他问题,只要没有其他答案更好,或者 较早发布。

    答案可能会给您留下相当多的工作......

    1. 我对 Python 导入的理解是否准确? 我错过了关于 R 的多个实例的观点?如果我错了,如何 我应该编辑导入语句以使新实例成为 在每个子流程中创建?这个问题的答案很可能 向我指出一个可能的解决方案,并且将被接受,前提是 没有更好的答案,或者之前发布过

    Python 包/模块是在您的进程中“唯一”导入的,这意味着在进程中使用包/模块的所有代码都使用相同的单一导入(在给定的块中,您没有每个 import 的副本)。

    因此,我建议在创建池时使用初始化函数,而不是在每次将任务发送给工作人员时重复导入 rpy2 并设置转换。如果每项任务都很短,您也可能会提高性能。

    def arima_select(y, order):
        # FIXME: check whether the rpy2.robjects package
        #        should be (re) imported as ro to be visible          
        res = forecast.Arima(y, order=ro.FloatVector(order))
        return res
    
    forecast = None
    
    def worker_init():
        from rpy2 import robjects as ro
        from rpy2.robjects.packages import importr
        from rpy2.robjects import pandas2ri
        pandas2ri.activate()
        global forecast
        forecast = importr('forecast')
    
    def applyParallel(groups, func):
        from multiprocessing import Pool, cpu_count
        with Pool(cpu_count(), worker_init) as p:
            ret_list = p.map(func, [group for _, group in groups])
        return pd.concat(ret_list, keys=[name for name, _ in groups])
    
    1. R 是否在所有人之间共享 rpy2 的实例,或者每个实例都有一个 R 实例 rpy2?此问题的答案只有在导致以下情况时才会被接受 解决问题。

    rpy2 通过链接其 C 共享库使 R 可用。 每个 Python 进程 一个这样的库,它是一个有状态的库(R 不能处理并发)。我认为您的问题与对象序列化(请参阅http://rpy2.readthedocs.io/en/version_2.8.x/robjects_serialization.html#object-serialization)有关,而不是与并发有关。

    在 Python 腌制 rpy2 对象后重建 R 对象时,发生了一些明显的混乱。更具体地说,当查看错误消息中提到的 R 对象类型时:

    >>> from rpy2.rinterface import str_typeint
    >>> str_typeint(6)
    'LANGSXP'
    >>> str_typeint(24)
    'RAWSXP'
    

    我猜forecast.Arima 返回的 R 对象包含未计算的 R 表达式(例如导致该结果对象的调用),并且在序列化和反序列化时它会以不同的方式返回(原始字节向量)。这可能是 R 自己的序列化机制的错误(因为 rpy2 在后台使用它)。现在,为了解决您的问题,您可能希望提取您最关心的 forecast.Arima 并仅从工作人员运行的函数调用中返回它。

    【讨论】:

    • 非常感谢您的回答。我会试试你的建议。我非常长的问题源于我自己在提出问题时的不确定性。感谢您花时间通读它。我把它分成几个部分,以便了解这些内容的人可以跳到他们认为相关的部分。我不介意做很多工作,我想知道幕后的工作原理,以便我可以适当地修改我的代码。到目前为止,我一直在盲目地更改代码,希望错误会神奇地消失。
    • 没问题。看来,这不是一个简单的问题,而且我以某种方式设法理解了它(所以你的成功;-))。
    • 顺便说一句,我编辑了我的答案,可能是问题的具体原因。
    • 我们上面讨论的方法不起作用。经过多次尝试,我仍然遇到同样的错误。我决定纯粹使用低级的rinterface 模块,而不是robjects 模块中的任何东西。 initr 初始化程序在我的问题中的 arima 函数以及 rimport 函数中被调用。现在,我收到此错误:RuntimeError: R cannot evaluate code before being initialized. 任何见解?
    • 基本上,我假设robjects 模块可能会在导入时初始化 R,并且由于导入是全局共享的,所有 R 作业都由一个 R 实例处理。但是,initr 似乎只在调用时才初始化 R,并且通过在每个进程中调用它,我希望每个进程都有一个 R 实例。现在它死于另一个奇怪的错误。请注意,与前一个一样,Thread-3 也会引发此错误。我在 8 核机器上运行,其中试验数据旋转 5 核。
    【解决方案2】:

    问题工作中提供的伪代码中arima_select函数的以下更改:

    import numpy as np
    import pandas as pd
    from rpy2 import rinterface as ri
    
    ri.initr()
    
    def arima_select(y, order):
    
        def rimport(packname):
            as_environment = ri.baseenv['as.environment']
            require = ri.baseenv['require']
            require(ri.StrSexpVector([packname]),
                    quiet = ri.BoolSexpVector((True, )))
            packname = ri.StrSexpVector(['package:' + str(packname)])
            pack_env = as_environment(packname)
            return pack_env
    
        frcst = rimport("forecast")
        args = (('y', ri.FloatSexpVector(y)),
                ('order', ri.FloatSexpVector(order)),
                ('include.constant', ri.StrSexpVector(const)))
        return frcst['Arima'].rcall(args, ri.globalenv)
    

    保持其余伪代码相同。请注意,我已经进一步优化了代码,它不需要问题中提供的所有功能。基本上,以下是必要且充分的:

    import numpy as np
    import pandas as pd
    from rpy2 import rinterface as ri
    
    ri.initr()
    
    def arima(y, order=(1,1,1)):
        # This is the same as arima_select above, just renamed to arima
        ...
    
    def applyParallel(groups, func):
        from multiprocessing import Pool, cpu_count
        with Pool(cpu_count(), worker_init) as p:
            ret_list = p.map(func, [group for _, group in groups])
        return pd.concat(ret_list, keys=[name for name, _ in groups])
    
    def main():
        # Create your df in your favorite way:
        def data_gen(start_day):
            r = pd.Series(pd.date_range('2016-09-{}'.format(str(start_day)),
                                        periods=24*60, freq='T'),
                          name='tstamp')
            d = pd.Series(np.random.randint(10, 80, 1440), name='val')
            s = pd.Series(['sensor1']*1440, name='sensor')
            return pd.concat([s, r, d], axis=1)
        df = pd.concat([data_gen(day) for day in range(1,8)], ignore_index=True)
    
        applyParallel(df.groupby(['sensor', pd.Grouper(key='tstamp', freq='D')]),
                      arima) # Note one may use partial from functools to pass order to arima
    

    请注意,我也不会直接从applyParallel 调用arima,因为我的目标是为给定系列(传感器和天)找到最佳模型。我使用函数arima_wrapper 来遍历顺序组合,并在每次迭代时调用arima

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-01-21
      • 2022-01-11
      • 1970-01-01
      • 2016-12-16
      • 2021-12-20
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多