【发布时间】:2015-09-29 06:57:59
【问题描述】:
我使用 Spark 是为了好玩并学习有关 MapReduce 的新知识。所以,我正在尝试编写一个建议新友谊的程序(即一种推荐系统)。如果两个人尚未建立联系并且有很多共同的朋友,则执行他们之间的友谊建议。
友谊文本文件的结构类似于以下:
1 2,4,11,12,15
2 1,3,4,5,9,10
3 2,5,11,15,20,21
4 1,2,3
5 2,3,4,15,16
...
语法为:ID_SRC1<TAB>ID_DST1,ID_DST2,...。
程序应输出(打印或文本文件)如下内容:
1 3,5
3 1
5 1
...
语法为:ID_SRC1<TAB>ID_SUGG1,ID_SUGG2,...。当然,如果两个人共享最少数量的朋友,则程序必须建议建立友谊,在我们的例子中假设为3。
我已经编写了我的程序,但我想阅读您提供的更好、更强大的解决方案。事实上,我认为我的代码可以改进很多,因为从 4.2 MB 的输入文件输出需要很长时间。
在我的代码下面:
from pyspark import SparkContext, SparkConf
def linesToDataset(line):
(src, dst_line) = line.split('\t')
src = int(src.strip())
dst_list_string = dst_line.split(',')
dst_list = [int(x.strip()) for x in dst_list_string if x != '']
return (src, dst_list)
def filterPairs(x):
# don't take into account pairs of a same node and pairs of already friends
if (x[0][0] != x[1][0]) and (not x[0][0] in x[1][1]) and (not x[1][0] in x[0][1]):
shared = len(list(set(x[0][1]).intersection(set(x[1][1]))))
return (x[0][0], [x[1][0], shared])
def mapFinalDataset(elem):
recommendations = []
src = elem[0]
dst_commons = elem[1]
for pair in dst_commons:
if pair[1] > 3: # 3 is the minimum number of friends in common
recommendations.append(pair[0])
return (src, recommendations)
def main():
conf = SparkConf().setAppName("Recommendation System").setMaster("local[4]")
sc = SparkContext(conf=conf)
rdd = sc.textFile("data.txt")
dataset = rdd.map(linesToDataset)
cartesian = dataset.cartesian(dataset)
filteredDatasetRaw = cartesian.map(filterPairs)
filteredDataset = filteredDatasetRaw.filter(lambda x: x != None)
# print filteredDataset.take(10)
groupedDataset = filteredDataset.groupByKey().mapValues(list)
# print groupedDataset.take(10)
finalDataset = groupedDataset.map(mapFinalDataset)
output = finalDataset.take(100)
for (k, v) in output:
if len(v) > 0:
print str(k) + ': ' + str(v)
sc.stop()
if __name__ == "__main__":
main()
【问题讨论】:
-
有什么问题?
-
我想知道是否有比我更好的解决方案。
标签: python performance mapreduce apache-spark pyspark