map

map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
返回通过函数func传递源的每个元素形成的新的分布式数据集。通过函数得到一个新的分布式数据集。

    var rdd = session.sparkContext.parallelize(1 to 10)
    rdd.foreach(println)
    println("=========================")
    rdd.map(x => (x,1)).foreach(println)

结果:

67891012345

=========================

(6,1)(7,1)(8,1)(9,1)(10,1)(1,1)(2,1)(3,1)(4,1)(5,1)

 

filter

filter(func) Return a new dataset formed by selecting those elements of the source on which funcreturns true.

通过自定义函数对元素进行过滤

    val rdd = session.sparkContext.parallelize(1 to 10)
    rdd.foreach(print)
    val rdd2 = rdd.filter(_>6)
    println("=========================")
    rdd2.foreach(print)

结果:

67891012345

=========================

78910

 

filtMap

flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

通过自定义函数把RDD中的每一个元素映射成多个元素,返回一个集合。

  val ds = session.sparkContext.textFile("D:/公司/test.txt")
    ds.foreach(println)
    
    val ds2 = ds.flatMap(x => {
      x.toString().split(":")
    })
    
    println("===================")
    
    ds2.foreach(println)

结果:

{ "DEVICENAME": "����4", "LID": 170501310, "ADDRESS": "xxxx", "ID": 230001160 }

===================

{ "DEVICENAME"
"����4", "LID"
170501310, "ADDRESS"
"xxxx", "ID"
230001160 }

mapFunction

mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.

类型map.不过是分区进行。类似于批量。

 

mapPartitionsWithIndex

mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.

sample

sample(withReplacementfractionseed) Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.

采集一个RDD的随机样本。

其中包含三个参数

replacement 布尔类型,表示是否重样。

fraction 返回的比例数 介于0到1。如原来RDD数10,fraction=0.5,那么将返回一个长度为5的随机RDD。

seed  表示随机比例。默认为long的最大值。如果此值为恒值(不随机),那么返回的RDD相等。

union

union(otherDataset) Return a new dataset that contains the union of the elements in the source dataset and the argument.

 将两个RDD合并,不去重。

intersection

intersection(otherDataset) Return a new RDD that contains the intersection of elements in the source dataset and the argument.

返回两个RDD的交集。去重。

    var rdd = session.sparkContext.parallelize(1 to 10)
    rdd.foreach(println)
    val rdd2 = rdd.sample(true, 0.5)
    println("==============")
    rdd2.foreach(println)
    val rdd3 = rdd.intersection(rdd2)
    println("==============")
    rdd3.foreach(println)

结果:

==============
89955
==============
958

 

distinct

distinct([numTasks])) Return a new dataset that contains the distinct elements of the source dataset.

对RDD进行去重。参数为任务数。

其内部实现原理对元素进行分组,然后取第一个。

/**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
  }

 

    var rdd = session.sparkContext.parallelize(1 to 10)
    val rdd2 = rdd.sample(true, 0.5)
    rdd2.foreach(print)
    println("====================")
    val rdd3 = rdd2.distinct(10)
    rdd3.foreach(print)

结果:

7792224

==============

4279

groupByKey

groupByKey([numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. 
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

对一个(k,v)对的数据集进行k的分组,并返回一个v的集合。此算子使用前提是一个(k,v)对的RDD。

官方的建议是,如果要进行类似操作,最好使用reduceByKey 或者aggregateByKey 。

相当于group by ,它不可以自定义函数。如果在这个基础上需要做count等运算,需要使用reduceByKey 或者aggregateByKey 。

 

groupBy

groupBy和groupByKey略有不同。1:groupby可以自定义key;2:在返回值上,groupby返回的是[key,{key:value1,key:value2}],而groupByKey返回的是[key,{value1,value2}]

    val seq = Seq[String]("spark", "hadoop", "spark")
    val rdd = session.sparkContext.parallelize(seq)
    val rdd2 = rdd.map(x => (x, 1)).groupBy(_._1)//默认元素为key,此时同groupByKey,但返回值略有不同
    rdd2.foreach(println)

    println("==============")
    val rdd4 = rdd.map(x => (x, 1)).groupBy(x => {
      x._1 + new Random().nextInt(100);//可以自定义key
    })

    rdd4.foreach(println)

    println("==============")
    val rdd3 = rdd.map(x => (x, 1)).groupByKey()//默认元素为key
    rdd3.foreach(println)

结果:

  (spark,CompactBuffer((spark,1), (spark,1)))
  (hadoop,CompactBuffer((hadoop,1)))
  ==============

  (spark92,CompactBuffer((spark,1)))
  (hadoop72,CompactBuffer((hadoop,1)))
  (spark46,CompactBuffer((spark,1)))

  ==============

  (spark,CompactBuffer(1, 1))
  (hadoop,CompactBuffer(1))

 

reduceByKey

reduceByKey(func, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

针对一个(K,V)对的RDD,返回一个对K去重的值。这个具体的值是什么样,取决于第一个参数。

    val seq = Seq[String]("spark", "hadoop", "spark")
    val rdd = session.sparkContext.parallelize(seq)
    val rdd2 = rdd.map(x => (x, 1)).reduceByKey(_+_)
    rdd2.foreach(println)

结果:

   (spark,2)

  (hadoop,1)

 

小结:groupBy groupByKey reduceByKey

1:groupBy可以自定义key;2:在返回值上,groupBy返回的是[key,{key:value1,key:value2}],而groupByKey返回的是[key,{value1,value2}]

2:reduceByKey(func, [numTasks])的第一个参数为自定义函数,可以对结果进行再处理。groupBy([numTasks])和groupByKey([numTasks])都不能自定义函数,如实现wordcount的功能,需额外使用算子或自定义实现。

spark 算子之RDD

 

 3:reduceByKey和groupByKey内部原理不一样。这一点在官方注释上已经讲得很明白。reduceByKey会经过类似于Map与reduce之间的combiner操作(similarly to a "combiner" in MapReduce.)。会将各个节点上的数据进行合并之后再进行传输。

reduceByKey

/**
   * Merge the values for each key using an associative and commutative reduce function. This will
   * also perform the merging locally on each mapper before sending results to a reducer, similarly
   * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
   * parallelism level.
   */
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }
View Code

相关文章:

  • 2022-12-23
  • 2021-05-22
  • 2022-01-13
  • 2021-06-04
  • 2021-08-13
  • 2021-06-06
  • 2022-12-23
猜你喜欢
  • 2021-10-21
  • 2022-12-23
  • 2021-03-30
  • 2022-02-17
  • 2021-12-01
  • 2021-04-18
  • 2021-10-30
相关资源
相似解决方案