【问题标题】:Use pyspark to run a custom defined function from python使用 pyspark 从 python 运行自定义函数
【发布时间】:2019-10-27 19:37:47
【问题描述】:

我编写了一个自定义函数,可以从 .txt 文件中查找出现次数最多的单词。我需要通过 PySpark 作为 RDD 运行它

我写了一个函数top_five,它的唯一参数是file_name

import collections

def top_five(file_name):    

    file = open(file_name, 'r', encoding = 'utf8')

    list1 = []
    for line in file:
        print(line)
        words = line.split()
        for i in words:
            j =''.join(filter(str.isalpha, i))
            j = j.lower()
            if len(j) > 5:
                list1.append(j)            

    count = collections.Counter(list1)

    most_occur = count.most_common(5)

    print("The most used words in the Applied Data Science Textbook is:")
    for item in most_occur:
        print("\t" + item[0] + " occured " + str(item[1]) + " times")

    return

实际结果需要是top_five函数的最后3行,打印每个单词和出现次数

【问题讨论】:

    标签: python apache-spark pyspark custom-function


    【解决方案1】:

    在可能的情况下,您应该完全使用 Spark API,而不是尝试将依赖循环和本地状态的现有函数包装到特定的执行程序(通过使用字典计数器)

    file_name = 'README.md'
    
    spark = SparkSession.builder\
      .master('local[*]')\
      .getOrCreate()
    sc = spark.sparkContext
    
    rdd = sc.textFile(file_name)\
      .flatMap(lambda x: x.lower().split())\  # lowercase and split lines
      .map(lambda word: ''.join(filter(str.isalpha, word)))\  # remove non-alpha characters from words
      .filter(lambda word: len(word) > 5)\  # filter short words
      .map(lambda word: (word, 1))\  # count each words
      .reduceByKey(lambda a,b: a+b)\  # sum the counts by word
      .sortBy(lambda t: t[1], False)  # sort the words by descending counts
    
    # Collect to a Python list
    top_words = rdd.take(5)
    for word_pair in top_words:
        print(f'"{word_pair[0]}" occurred {word_pair[1]} times')
    

    【讨论】:

      【解决方案2】:

      目前尚不清楚上述哪些对象与您理想情况下要在此处扩充的 PySpark 数据框相关联。

      将您的函数转换为 PySpark UDF,这将允许您将逻辑应用到 PySpark 数据帧,而无需转换为 Pandas 并返回。

      对于非常大的数据帧,众所周知,UDF 的性能很差/需要很长时间才能运行。这也是我的亲身经历。

      【讨论】:

      • 我不知道您为什么要使用 UDF 只是为了进行字数统计和排序
      • @cricket_007 - 我同意有更好的方法来执行上述操作。根据您的观点,here 是在 PySpark 中执行字数统计的一个非常好的示例。我直接回答了OP的原始问题。没有重新提出问题。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-09-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-06-11
      • 2018-08-28
      • 2017-03-17
      相关资源
      最近更新 更多