【问题标题】:Spark computation for suggesting new friendships用于建议新友谊的 Spark 计算
【发布时间】:2015-09-29 06:57:59
【问题描述】:

我使用 Spark 是为了好玩并学习有关 MapReduce 的新知识。所以,我正在尝试编写一个建议新友谊的程序(即一种推荐系统)。如果两个人尚未建立联系并且有很多共同的朋友,则执行他们之间的友谊建议。

友谊文本文件的结构类似于以下:

1   2,4,11,12,15
2   1,3,4,5,9,10
3   2,5,11,15,20,21
4   1,2,3
5   2,3,4,15,16
...

语法为:ID_SRC1<TAB>ID_DST1,ID_DST2,...

程序应输出(打印或文本文件)如下内容:

1   3,5
3   1
5   1
...

语法为:ID_SRC1<TAB>ID_SUGG1,ID_SUGG2,...。当然,如果两个人共享最少数量的朋友,则程序必须建议建立友谊,在我们的例子中假设为3

我已经编写了我的程序,但我想阅读您提供的更好、更强大的解决方案。事实上,我认为我的代码可以改进很多,因为从 4.2 MB 的输入文件输出需要很长时间。

在我的代码下面:

from pyspark import SparkContext, SparkConf

def linesToDataset(line):
    (src, dst_line) = line.split('\t')
    src = int(src.strip())

    dst_list_string = dst_line.split(',')
    dst_list = [int(x.strip()) for x in dst_list_string if x != '']

    return (src, dst_list)  

def filterPairs(x):
     # don't take into account pairs of a same node and pairs of already friends
    if (x[0][0] != x[1][0]) and (not x[0][0] in x[1][1]) and (not x[1][0] in x[0][1]):
        shared = len(list(set(x[0][1]).intersection(set(x[1][1]))))
        return (x[0][0], [x[1][0], shared])

def mapFinalDataset(elem):
    recommendations = []
    src = elem[0]
    dst_commons = elem[1]
    for pair in dst_commons:
        if pair[1] > 3: # 3 is the minimum number of friends in common
            recommendations.append(pair[0])
    return (src, recommendations)

def main():
    conf = SparkConf().setAppName("Recommendation System").setMaster("local[4]")
    sc = SparkContext(conf=conf)
    rdd = sc.textFile("data.txt")

    dataset = rdd.map(linesToDataset)

    cartesian = dataset.cartesian(dataset)
    filteredDatasetRaw = cartesian.map(filterPairs)
    filteredDataset = filteredDatasetRaw.filter(lambda x: x != None)
#   print filteredDataset.take(10)

    groupedDataset = filteredDataset.groupByKey().mapValues(list)
#   print groupedDataset.take(10)

    finalDataset = groupedDataset.map(mapFinalDataset)
    output = finalDataset.take(100)
    for (k, v) in output:
        if len(v) > 0:
            print str(k) + ': ' + str(v)

    sc.stop()


if __name__ == "__main__":
    main()

【问题讨论】:

  • 有什么问题?
  • 我想知道是否有比我更好的解决方案。

标签: python performance mapreduce apache-spark pyspark


【解决方案1】:

当然是一种观点。

我认为我即将提出的策略在性能和可读性方面更好,但这必须是主观的。主要原因是我避免使用笛卡尔积,将其替换为 JOIN。

替代策略

说明

我提出的策略是基于基础数据线

1   2,4,11,12,15

可以认为是“交友建议”列表,意思是这行告诉我:“2 应该是 4、11、12、15 的朋友”、“4 应该是 2、11、12、15 的朋友” "等等。

因此,我的实现要点是

  1. 将每一行变成一个建议列表(foo 应该是 bar 的朋友)
  2. 按人分组建议(foo 应该是 bar、baz、bar 的朋友)并重复
  3. 计算重复的数量(foo 应该是 bar(2 个建议)、baz(1 个建议)的朋友
  4. 删除现有关系
  5. 过滤出现频率太低的建议
  6. 打印结果

实施

由于我更喜欢​​ Java/scala,请原谅 scala 语言,但它应该很容易映射到 Python。

首先,从您的文本文件中创建基本的友谊数据

def parseLine(line: String): (Int, Array[String]) = {
  (Integer.parseInt(line.substring(0, line.indexOf("\t"))), line.substring(line.indexOf("\t")+1).split(","))
}
def toIntegerArray(strings: Array[String]): Array[Int] = { 
  strings.filter({ x => !x.isEmpty() }).map({ x => Integer.parseInt(x) }) 
}
// The friendships that exist
val alreadyFriendsRDD = sc.textFile("src/data.txt", 4)
        // Parse file : (id of the person, [Int] of friends)
        .map { parseLine }
        .mapValues( toIntegerArray );

并将它们转换为建议

// If person 1 is friends with 2 and 4, this means we should suggest 2 to become friends with 4 , and vice versa
def toSymetricalPairs(suggestions: Array[Int]): TraversableOnce[(Int, Int)] = {
  suggestions.combinations(2)
             .map { suggestion => (suggestion(0), suggestion(1)) }
             .flatMap { suggestion => Iterator(suggestion, (suggestion._2, suggestion._1)) }
}
val suggestionsRDD = alreadyFriendsRDD
  .map( x => x._2 )
  // Then we create suggestions from the friends Array
  .flatMap( toSymetricalPairs ) 

一旦你有一个 RDD 的建议,重新组合它们:

def mergeSuggestion(suggestions: mutable.HashMap[Int, Int], newSuggestion: Int): mutable.HashMap[Int, Int] = {
  suggestions.get(newSuggestion) match {
    case None => suggestions.put(newSuggestion, 1)
    case Some(x) => suggestions.put(newSuggestion, x + 1)
  }
  suggestions
}
def mergeSuggestions(suggestions: mutable.HashMap[Int, Int], toMerge: mutable.HashMap[Int, Int]) = {
  val keySet = suggestions.keySet ++: toMerge.keySet
  keySet.foreach { key =>
    suggestions.get(key) match {
      case None => suggestions.put(key, toMerge.getOrElse(key, 0))
      case Some(x) => suggestions.put(key, x + toMerge.getOrElse(key, 0))
    }
  }
  suggestions
}

def filterRareSuggestions(suggestions: mutable.HashMap[Int, Int]): scala.collection.Set[Int] = {
  suggestions.filter(p => p._2 >= 3).keySet
}

// We reduce as a RDD of suggestion count by person
val suggestionsByPersonRDD = suggestionsRDD.combineByKey(
    // For each person, create a map of suggestion count
    (person: Int) => new mutable.HashMap[Int, Int](),           
    // For every suggestion, merge it into the map
    mergeSuggestion , 
    // When merging two maps, sum the suggestions
    mergeSuggestions
    )
    // We restrict to suggestions that occur more than 3 times
    .mapValues( filterRareSuggestions )

最后通过考虑已经存在的友谊来过滤建议

val suggestionsCleanedRDD = suggestionsByPersonRDD
  // We co-locate the suggestions with the known friends
  .join(alreadyFriendsRDD)
  // We clean the suggestions by removing the known friends
  .mapValues (_ match { case (suggestions, alreadyKnownFriendsByPerson) => {
    suggestions -- alreadyKnownFriendsByPerson
  }})

哪些输出,例如:

(49831,Set(49853, 49811, 49837, 49774))
(49835,Set(22091, 20569, 29540, 36583, 31122, 3004, 10390, 4113, 1137, 15064, 28563, 20596, 36623))
(49839,Set())
(49843,Set(49844))

意思是 49831 应该是 49853、49811、49837、49774 的朋友。

速度

在您的数据集和 2012 Corei5@2.8GHz(双核超线程)/2g RAM 上试用,我们在 1.5 分钟内完成。

【讨论】:

  • 感谢您的 GPI 解决方案。我想更好地了解你的策略。例如,如果我们遇到(1, Array(11,12,13)), ..., (30, Array(11,12,13)) 的情况会发生什么?
  • 您将如何为您的场景定义所需的输出?我有2个问题。首先,如果 1 是 11 的朋友,我必须假设 11 是 1 的朋友,对吧?第二:在您的场景中假设关系 (11, Array(11, 12, 13)) 吗?如果是这样,我必须假设实现不应该回答任何建议。
  • 1) 是的,友谊是相互的,所以如果 1 和 11 是朋友,那么 11 和 1 是朋友。 2) 关系 (11, Array(11, 12, 13)) 永远不会发生,即个人不能与自己成为朋友。
  • 抱歉造成误会。您在第一条评论中建议的示例数据集应理解为(1, Array(11,12,13)), ...(11, Array(1, 2, 3, ...10, 12, 13, ... 30), ..., (30, Array(11,12,13)),对吗?
  • No no... 11 只能与 130 成为朋友。所以正确的元组是(11, Array(1,30))
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-07-01
  • 2023-03-14
  • 1970-01-01
  • 2015-01-25
相关资源
最近更新 更多