【问题标题】:apply tensorflow probability model on spark dataset在火花数据集上应用张量流概率模型
【发布时间】:2021-09-21 15:13:19
【问题描述】:
我正在使用来自 tensorflow 概率的 sts 来生成预测,这在我使用的数据样本上表现良好,但我现在想在更广泛的范围内尝试它,所以我想在 PySpark 中实现 sts 模型.
我有一个如下所示的数据集:
| Id |
Date |
value |
| 1 |
01/01/2021 |
10 |
| 1 |
01/02/2021 |
15 |
| 1 |
01/03/2021 |
11 |
| 2 |
01/01/2021 |
100 |
| 2 |
01/02/2021 |
120 |
| 2 |
01/03/2021 |
90 |
| ... |
... |
... |
我想找到一种方法来为每个 id(具有相同数量的条目)创建预测,并且需要将我之前构建的模型应用到每个 id,最好的方法是什么?
【问题讨论】:
标签:
tensorflow
pyspark
tensorflow-probability
【解决方案1】:
我通过在 pyspark 中使用 pandas_udf 解决了这个问题:
# Create the pandas udf:
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def Forecast(pdf):
# From a pandas dataframe create a serie with timestamp as index
pdf = pdf.sort_values(by=['Date'])
PTS = pdf.drop(columns=['Id'])
PTS.set_index("date", inplace=True)
PTS.index = utils.add_freq(PTS.index, 'MS')
PTS.loc[:, 'value'] = PTS.loc[:, 'value'].astype(float)
_train = PTS['value'][PTS.index < Split_Date]
train = _train.to_numpy().reshape(-1, 1)
forecast_distribution = utils.myForecast(train)
fcst_mu = forecast_distribution.mean().numpy()[..., 0]
list_date = pdf.loc[pdf['Date'] >= Split_Date, 'Date'].tolist()
for i in range(len(list_date)):
pdf.loc[pdf['Date'] == list_date[i], 'QTY'] = fcst_mu[i]
return pdf
# Apply the function per group of Id:
df= df.groupby('Id').apply(Forecast)
其中myForecast 是在另一个文件中创建的 STS 模型,add_freq 是添加句点的函数。