【发布时间】:2020-02-27 10:04:24
【问题描述】:
我想计算一个大型数据集(1M 行)的相关矩阵。 这个想法是计算产品销售的相关性。如果两种产品的销售额同比增长/下降幅度相似,则可能存在相关性。
我已经尝试过这里的帖子:
- How to get correlation matrix values pyspark
- How to get the correlation matrix of a pyspark data frame?
- How to plot correlation heatmap when using pyspark+databricks
- https://gist.github.com/cameres/bc24ac6711c9e537dd20be47b2a83558
它们或多或少都做同样的事情,但它们会在驱动程序处收集相关矩阵。这是一个问题,因为大型数据集使得这个集合RAM 密集。我正在寻找一种方法来解决这个问题,并利用 Spark 的分布式计算。有 170k 个独特的产品,因此作业运行 170k 次,并且有 29B 个组合。
我的想法是逐列计算相关性(交叉应用),然后将其收集在数据框(或 RDD)中,以对其运行过滤器(仅相关性 > 0.8)。但我没有好主意开始。
数据集基本上是这样的。
d = {'Product': ['A', 'B', 'C','A', 'B', 'C','A', 'B', 'C'],\
'Year': [2010, 2010, 2010, 2011, 2011, 2011, 2012, 2012, 2012],\
'Revenue': [100, 200, 300, 110, 190, 320, 120, 220, 350]}
df = pd.DataFrame(data=d)
我将数据转置为列中的年份。
df = df.pivot(index='Product', columns='Year', values='Revenue').fillna(0)
我计算 pct_change 以获得每年的相对变化。
df_diff = df.pct_change(axis=1).replace([np.inf, -np.inf], np.nan).fillna(0)
Year 2010 2011 2012
Product
A 0.0 0.100000 0.090909
B 0.0 -0.050000 0.157895
C 0.0 0.066667 0.093750
而且我需要相关性... 用熊猫很容易
# change structure
df_diff = df_diff.stack().unstack(level=0)
# get correlation
df_diff = df_diff.corr().abs()
# change structure back
df_diff = df_diff.unstack().to_frame(name='value')
df_diff.index = df_diff.index.set_names(['Product_1', 'Product_2'])
df_diff.reset_index(inplace=True)
Product_1 Product_2 value
0 A A 1.000000
1 A B 0.207317
2 A C 0.933485
3 B A 0.207317
4 B B 1.000000
5 B C 0.544352
6 C A 0.933485
7 C B 0.544352
8 C C 1.000000
【问题讨论】:
-
如果您需要任何进一步的解释,请告诉我
-
由于对称性,还有一些冗余可以利用,我在下面的答案中没有这样做
-
谢谢。我会试试这个并尽快给你反馈。
标签: pandas apache-spark pyspark