【发布时间】:2016-11-02 06:18:10
【问题描述】:
所以这是一个我无法解决的问题,我也不知道有什么好方法可以用来制作 MCVE。本质上,它已经被简要讨论过here,但正如 cmets 所显示的,存在一些分歧,最终判决仍未定论。因此,我再次发布类似的问题,希望得到更好的答案。
背景
我有来自几千个传感器的传感器数据,我每分钟都会收到这些数据。我的兴趣在于预测数据。为此,我使用 ARIMA 系列的预测模型。长话短说,在与我的研究小组的其他成员讨论后,我们决定使用 R 包 forecast 中提供的 Arima 函数,而不是相同的 statsmodels 实现。
问题定义
因为我有来自几千个传感器的数据,我想至少分析一整周的数据(开始),因为一周有 7 天,所以我有 7 倍的数量传感器数据与我。基本上是大约 14k 传感器日组合。对于每个传感器日组合,查找最佳 ARIMA 顺序(将 BIC 最小化)并预测一周中的下一天数据大约需要 1 分钟。这意味着在单核上处理一周的数据需要 11 天以上的时间!
这显然是一种浪费,因为我还有 15 个核心一直处于闲置状态。因此,显然,这是并行处理的问题。请注意,每个传感器日组合不会影响任何其他传感器日组合。此外,我的其余代码的配置和优化都相当完善。
问题
问题是我得到了这个我无法在任何地方发现的奇怪错误。这是重现的错误:
Exception in thread Thread-3:
Traceback (most recent call last):
File "/home/kartik/miniconda3/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/home/kartik/miniconda3/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "/home/kartik/miniconda3/lib/python3.5/multiprocessing/pool.py", line 429, in _handle_results
task = get()
File "/home/kartik/miniconda3/lib/python3.5/multiprocessing/connection.py", line 251, in recv
return ForkingPickler.loads(buf.getbuffer())
File "/home/kartik/miniconda3/lib/python3.5/site-packages/rpy2/robjects/robject.py", line 55, in _reduce_robjectmixin
rinterface_level=rinterface_factory(rdumps, rtypeof)
ValueError: Mismatch between the serialized object and the expected R type (expected 6 but got 24)
以下是我发现的此错误的一些特征:
- 它在
rpy2包中提出 - 它与线程 3 有关。由于 Python 是零索引的,我猜这是第四个线程。因此,4x6 = 24,加起来就是最终错误语句中显示的数字
-
rpy2仅在我的代码中的一个地方使用,它可能必须将返回的值重新编码为 Python 类型。在try: ... except: ...中保护该行不会捕获该异常 - 当我放弃多处理并在循环中调用函数时不会引发异常
- 异常不会使程序崩溃,只是永远挂起它(直到我 Ctrl+C 终止它)
- 到目前为止我尝试的所有方法都没有解决错误
尝试过的事情
我已经尝试了从极端程序编码,处理最少情况的函数(即只有一个要并行调用的函数)到极端封装,其中if __name__ == '__main__': 中的可执行块调用一个函数读入数据,进行必要的分组,然后将组传递给另一个函数,该函数导入multiprocessing并并行调用另一个函数,该函数导入导入rpy2的处理模块,并将数据传递给Arima R中的函数。
基本上,rpy2 是否在函数嵌套的深处被调用和初始化并不重要,这样它不知道可能会初始化另一个实例,或者如果它被全局调用和初始化一次,则会引发错误如果涉及multiprocessing。
伪代码
这里尝试提供至少一些基本的伪代码,以便可能重现错误。
import numpy as np
import pandas as pd
def arima_select(y, order):
from rpy2 import robjects as ro
from rpy2.robjects.packages import importr
from rpy2.robjects import pandas2ri
pandas2ri.activate()
forecast = importr('forecast')
res = forecast.Arima(y, order=ro.FloatVector(order))
return res
def arima_wrapper(data):
data = data[['tstamp', 'val']]
data.set_index('tstamp', inplace=True)
return arima_select(data, (1,1,1))
def applyParallel(groups, func):
from multiprocessing import Pool, cpu_count
with Pool(cpu_count()) as p:
ret_list = p.map(func, [group for _, group in groups])
return pd.concat(ret_list, keys=[name for name, _ in groups])
def wrapper():
df = pd.read_csv('file.csv', parse_dates=[1], infer_datetime_format=True)
df['day'] = df['tstamp'].dt.day
res = applyParallel(df.groupby(['sensor', 'day']), arima_wrapper)
print(res)
显然,上述代码可以进一步封装,但我认为它应该相当准确地重现错误。
数据样本
这是上面伪代码中arima_wrapper 中的data.set_index('tstamp', inplace=True) 紧邻print(data.head(6)) 的输出:
或者,一个传感器一整周的数据可以简单地生成:
def data_gen(start_day):
r = pd.Series(pd.date_range('2016-09-{}'.format(str(start_day)),
periods=24*60, freq='T'),
name='tstamp')
d = pd.Series(np.random.randint(10, 80, 1440), name='val')
s = pd.Series(['sensor1']*1440, name='sensor')
return pd.concat([s, r, d], axis=1)
df = pd.concat([data_gen(day) for day in range(1,8)], ignore_index=True)
观察和问题
第一个观察是这个错误只在涉及multiprocessing 时引发,而不是在循环调用函数(arima_wrapper)时引发。因此,它必须以某种方式与多处理问题相关联。 R 对多进程不是很友好,但是按照伪代码所示的方式编写时,R 的每个实例都不应该知道其他实例的存在。
伪代码的结构方式,在multiprocessing 产生的多个子进程内的每个调用都必须初始化rpy2。如果这是真的,rpy2 的每个实例都应该产生自己的 R 实例,它应该只执行一个函数,然后终止。这不会引发任何错误,因为它类似于单线程操作。我在这里的理解是否准确,还是我完全或部分没有抓住重点?
如果rpy2 的所有实例都以某种方式共享 R 的实例,那么我可能会合理地预期该错误。真实情况:R 是否在rpy2 的所有实例之间共享,或者rpy2 的每个实例都有一个 R 实例?
如何解决这个问题?
由于 SO 讨厌其中包含多个问题的问题线程,因此我将优先考虑我的问题,以便接受部分答案。这是我的优先级列表:
- 如何解决这个问题?未提出问题的工作代码示例将被接受为答案,即使它没有回答任何其他问题,只要没有其他答案更好,或者之前发布过。
- 我对 Python 导入的理解是否准确,还是我错过了关于 R 的多个实例的要点?如果我错了,我应该如何编辑导入语句以便在每个子流程中创建一个新实例?这个问题的答案可能会为我指明一个可能的解决方案,并且会被接受,前提是没有更好的答案,或者之前发布过
- R 是在
rpy2的所有实例之间共享,还是rpy2的每个实例都有一个R 实例?此问题的答案只有在能够解决问题时才会被接受。
【问题讨论】:
标签: python r multithreading parallel-processing rpy2