【问题标题】:pyspark - how to reducebykey to get the min of the one element of a tuple onlypyspark - 如何通过reducebykey仅获取元组的一个元素的最小值
【发布时间】:2020-05-02 01:36:55
【问题描述】:

假设我们在 csv 文件中有以下数据(站号、日期、数据类型、温度):

'ITE00100554','18000101','TMIN',-7.5

'ITE00100554','18000101','TMIN',-14.8

'ITE00100554','18000102','TMIN',-7.5

'ITE00100554','18000102','TMIN',-14.9

我尝试找到具有相应日期的最低温度。答案应该是:'ITE00100554', '18000102', 'TMIN', -14.9

这是我的代码:

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("MinTempDate")
sc = SparkContext(conf = conf)

# parsing (mapping) the input data
def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    date = fields[1]
    entryType = fields[2]
    temperature = float(fields[3])
    # creates key/value pairs
    return (stationID, date, entryType, temperature) 


lines = sc.textFile("1800.csv")
parsedLines = lines.map(parseLine)


minTemps = parsedLines.filter(lambda x: "TMIN" in x[2])


stationTemps = minTemps.map(lambda x: (x[0], (x[1], x[3])))


minTemps = stationTemps.reduceByKey(lambda x, y: (if x[1] >= y[1] y[0] else x[0],min(x[1], y[1])))

我的问题是最后一行的reduceByKey,因为语法不正确,我不知道如何确保我得到了最低温度和相应的日期。

如果我这样做:

minTemps = stationTemps.reduceByKey(lambda x, y: min(x, y))

它不起作用,因为它将返回元组的最小值,这意味着: 'ITE00100554', '18000101', 'TMIN', -14.8

这不是我想要的。

这篇文章非常接近我正在寻找的内容,但是我不知道如何更改我的语法以使其工作: Spark Python - how to use reduce by key to get minmum/maximum values

【问题讨论】:

    标签: pyspark


    【解决方案1】:

    将 lambda 中 if 子句的语法更改为,

     minTemps = stationTemps.reduceByKey(lambda x, y: (y[0] if x[1] >= y[1] else x[0],min(x[1],y[1])))
    

    【讨论】:

      【解决方案2】:
      from pyspark import SparkConf, SparkContext
      
      conf = SparkConf().setMaster("local").setAppName("MinTemperatures")
      sc = SparkContext(conf = conf)
      
      def parseLine(line):
          fields = line.split(',')
          stationID = fields[0]
          entryType = fields[2]
          temperature = float(fields[3])
          return (stationID, entryType, temperature)
      
      lines = sc.textFile("hdfs://..../1800.csv")
      parsedLines = lines.map(parseLine)
      
      minTemps = parsedLines.filter(lambda x: "TMIN" in x[1])
      stationTemps = minTemps.map(lambda x: (x[0], x[2]))
      
      miniTemps = stationTemps.reduceByKey(lambda x, y: min(x,y))
      results = miniTemps.collect()
      
      for result in results:
          print("Station Id: {0} \n Min Temp: {1}".format(result[0], result[1]))
      
      
      
      
      Output:
      
      Station Id: ITE00100554 
       Min Temp: -148.0
      Station Id: EZE00100082 
       Min Temp: -135.0
      

      【讨论】:

      • 请描述您的更改、更改原因以及解决问题的方式。差异并不那么引人注目。
      猜你喜欢
      • 1970-01-01
      • 2016-12-30
      • 1970-01-01
      • 1970-01-01
      • 2016-12-27
      • 2018-01-27
      • 2015-07-02
      • 2022-06-28
      • 2019-05-21
      相关资源
      最近更新 更多