600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > Spark RDD常用算子使用总结

Spark RDD常用算子使用总结

时间:2020-01-06 00:00:23

相关推荐

Spark RDD常用算子使用总结

文章目录

概述Transformation(转换算子)1. map2. flatMap3. filter4. mapPartitions5. mapPartitionsWithIndex6. sample7. mapValues8. union(并集)9. substract(差集)10. reduceByKey11. groupByKey12. combineByKey13. foldByKey14. aggregateByKey15. join16. sortBy17. repartitionAction(执行算子)1. reduce2. foreach3. count、countByKey、countByValue4. take、takeSample、first5. max、min、mean、sum(数字运算)代码汇总transformation 部分代码汇总action 部分代码汇总

概述

对于 Spark 处理的大量数据而言,会将数据切分后放入RDD作为Spark 的基本数据结构,开发者可以在 RDD 上进行丰富的操作,之后 Spark 会根据操作调度集群资源进行计算。总结起来,RDD 的操作主要可以分为 Transformation 和 Action 两种。

官方文档

(1)Transformation 转换操作:返回一个新的RDD which create a new dataset from an existing one所有Transformation函数都是Lazy,不会立即执行,需要Action函数触发 (2)Action动作操作:返回值不是RDD(无返回值或返回其他的) which return a value to the driver program after running a computation on the datase所有Action函数立即执行(Eager),比如count、first、collect、take等

此外注意RDD中函数细节:

第一点:RDD不实际存储真正要计算的数据,而是记录了数据的位置在哪里,数据的转换关系(调用了什么方法,传入什么函数);第二点:RDD中的所有转换都是惰性求值/延迟执行的,也就是说并不会直接计算。只有当发生一个要求返回结果给Driver的Action动作时,这些转换才会真正运行。之所以使用惰性求值/延迟执行,是因为这样可以在Action时对RDD操作形成DAG有向无环图进行Stage的划分和并行优化,这种设计让Spark更加有效率地运行

Transformation(转换算子)

在Spark中Transformation操作表示将一个RDD通过一系列操作变为另一个RDD的过程,这个操作可能是简单的加减操作,也可能是某个函数或某一系列函数。值得注意的是Transformation操作并不会触发真正的计算,只会建立RDD间的关系图

如下图所示,RDD内部每个方框是一个分区。假设需要采样50%的数据,通过sample函数,从 V1、V2、U1、U2、U3、U4 采样出数据 V1、U1 和 U4,形成新的RDD。

1. map

源码:

def map[U](f : scala.Function1[T, U])(implicit evidence$3 : scala.reflect.ClassTag[U]) : org.apache.spark.rdd.RDD[U] = { /* compiled code */ }

表示将 RDD 经由某一函数 f 后,转变为另一个RDD。

只需要传入一个函数即可,如下代码,将原来的Seq集合中每个元素都乘以10,再返回结果,如下:

@Testdef mapTest(): Unit = {// 1. 创建RDDval rdd1 = sc.parallelize(Seq(1, 2, 3))// 2. 执行 map 操作val rdd2 = rdd1.map(item => item * 10)// 3. 得到结果val result = rdd2.collect() //通过调用collect来返回一个数组,然后打印输出result.foreach(item => println(item))}

运行结果:

102030Process finished with exit code 0

2. flatMap

源码:

def flatMap[U](f : scala.Function1[T, scala.TraversableOnce[U]])(implicit evidence$4 : scala.reflect.ClassTag[U]) : org.apache.spark.rdd.RDD[U] = { /* compiled code */ }

表示将 RDD 经由某一函数 f 后,转变为一个新的 RDD,但是与 map 不同,RDD 中的每一个元素会被映射成新的 0 到多个元素(f 函数返回的是一个序列 Seq)。

代码演示:

@Testdef flatMapTest() = {// 1. 创建RDDval rdd1 = sc.parallelize(Seq("Hello 吕布", "Hello 貂蝉", "Hello 铠"))// 2. 处理数据val rdd2 = rdd1.flatMap(item => item.split(" "))// 3. 查看结果val result = rdd2.collect()result.foreach(item => println(item))// 4. 关闭资源sc.stop()}

运行结果:

Hello吕布Hello貂蝉Hello铠Process finished with exit code 0

3. filter

源码:

def filter(f : scala.Function1[T, scala.Boolean]) : org.apache.spark.rdd.RDD[T] = { /* compiled code */ }

filter 可以过滤掉数据集中的一部分元素filter 中接受的函数,参数是每一个元素,如果这个函数返回true,当前元素就会被加入新数据集,如果返回false,当前元素会被过滤掉

代码演示:

@Testdef filter() = {sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).filter(item => item % 2 == 0) //取偶数.collect().foreach(item => println(item))}

运行结果:

246810Process finished with exit code 0

4. mapPartitions

源码:

def mapPartitions[U](f : scala.Function1[scala.Iterator[T], scala.Iterator[U]], preservesPartitioning : scala.Boolean = { /* compiled code */ })(implicit evidence$6 : scala.reflect.ClassTag[U]) : org.apache.spark.rdd.RDD[U] = { /* compiled code */ }

mapPartitions 和 map算子一样,只不过map是针对每一条数据进行转换,mapPartitions针对一整个分区的数据进行转换map的func参数是单条数据,mapPartitions的func参数是一个集合(一个分区所有的数据)map的func返回值也是单条数据,mapPartitions的func返回值是一个集合

代码演示:

@Testdef mapPartitions(): Unit = {// 1. 数据生成// 2. 算子使用// 3. 获取结果sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices = 2).mapPartitions(iter => {iter.foreach(item => println(item))iter}).collect()}

运行结果:

142536Process finished with exit code 0

如果想给上述集合中的元素都乘以10该,如何操作?

@Testdef mapPartitions2(): Unit = {// 1. 数据生成// 2. 算子使用// 3. 获取结果sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices = 2).mapPartitions(iter => {// 如果想给集合中的数字都乘10,该如何操作?// 遍历 iter 其中每一条数据进行转换,转换完成之后,返回 iterval result = iter.map(item => item * 10) //注意这个的map算子并不是RDD中的,而是Scala中的result}).collect().foreach(item => println(item))}

运行结果:

102030405060Process finished with exit code 0

5. mapPartitionsWithIndex

源码:

def mapPartitionsWithIndex[U](f : scala.Function2[scala.Int, scala.Iterator[T], scala.Iterator[U]], preservesPartitioning : scala.Boolean = { /* compiled code */ })(implicit evidence$9 : scala.reflect.ClassTag[U]) : org.apache.spark.rdd.RDD[U] = { /* compiled code */ }

mapPartitionsWithIndex 和 mapPartitions 的区别是 func 参数中多了一个参数,分区号

@Testdef mapPartitionsWithIndex(): Unit = {sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices = 2).mapPartitionsWithIndex( (index, iter) => {println("index: " + index)iter.foreach(item => println(item))iter} ).collect()}

运行结果:

index: 0123index: 1456Process finished with exit code 0

运行结果也有可能是这样:原因是RDD的并发性质

index: 1index: 0456123Process finished with exit code 0

6. sample

源码:

def sample(withReplacement : scala.Boolean, fraction : scala.Double, seed : scala.Long = { /* compiled code */ }) : org.apache.spark.rdd.RDD[T] = { /* compiled code */ }

采样,尽可能减少数据集的规律损失withReplacement 参数决定有放回或者无放回采样fraction 参数是采样比例

@Testdef sample() = {val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))val rdd2 = rdd1.sample(false, 0.6)//第一个参数为false代表无放回采样,0.6是采样比例val result = rdd2.collect()result.foreach(item => println(item))}

运行结果:

3456789Process finished with exit code 0

7. mapValues

源码:

def mapValues[U](f : scala.Function1[V, U]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, U]] = { /* compiled code */ }

mapValues 也是 map,只不过map作用于整条数据,mapValues作用于 Value

@Testdef mapValues() = {sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3))).mapValues(item => item * 10).collect().foreach(println(_))}

运行结果:

(a,10)(b,20)(c,30)Process finished with exit code 0

8. union(并集)

@Testdef union() = {val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5))val rdd2 = sc.parallelize(Seq(3, 4, 5, 6, 7))rdd1.union(rdd2).collect().foreach(println(_))}

运行结果:

1234534567Process finished with exit code 0

9. substract(差集)

@Testdef subtract() = {val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5))val rdd2 = sc.parallelize(Seq(3, 4, 5, 6, 7))rdd1.subtract(rdd2) //rdd1-rdd2.collect().foreach(println(_))}

运行结果:

12Process finished with exit code 0

10. reduceByKey

源码:

def reduceByKey(func : scala.Function2[V, V, V]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, V]] = { /* compiled code */ }

聚合操作时,往往聚合过程中需要中间临时变量(到底时几个变量,具体业务而定)

@Testdef reduceByKey() = {// 1.创建RDDval rdd1 = sc.parallelize(Seq("Hello 吕布", "Hello 貂蝉", "Hello 铠"))// 2.处理数据val rdd2 = rdd1.flatMap(item => item.split(" ")).map(item => (item, 1)).reduceByKey((curr, agg) => curr + agg) //注意agg是一个临时变量,或者局部结果,起始值为0// 3.得到结果val result = rdd2.collect()result.foreach(item => println(item))// 4.关闭资源sc.stop()

运行结果:

(铠,1)(貂蝉,1)(Hello,3)(吕布,1)Process finished with exit code 0

11. groupByKey

RDD中groupByKey和reduceByKey区别???

reduceByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。简而言之,分组聚合

groupByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的函数,将相同key的值聚合到一起,与reduceByKey的区别是只生成一个sequence。简而言之就是只分组,不聚合

@Testdef groupByKey() = {sc.parallelize(Seq(("a", 1), ("a", 1), ("c", 3))).groupByKey().collect().foreach(println(_)) //只有一个参数打印输出可以简写}

运行结果:

(a,CompactBuffer(1, 1))(c,CompactBuffer(3))Process finished with exit code 0

12. combineByKey

源码:

def combineByKey[C](createCombiner : scala.Function1[V, C], mergeValue : scala.Function2[C, V, C], mergeCombiners : scala.Function2[C, C, C]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, C]] = { /* compiled code */ }

CombineByKey 算子中接受三个参数: 转换数据的函数(初始函数,作用于第一条数据,用于开启整个计算),在分区上进行聚合,把所有分区的聚合结果聚合为最终结果

@Testdef combineByKey() = {// 1.准备集合val rdd: RDD[(String, Double)] = sc.parallelize(Seq(("铠", 100.0),("耀", 99.0),("镜", 99.0),("镜", 98.0),("铠", 97.0)))// 2.算子操作// 2.1 createCombiner 转换数据// 2.2 mergeValue 分区上的聚合// 2.3 mergeCombiners 把分区上的结果再次聚合,生成最终结果val combineResult = bineByKey(createCombiner = (curr: Double) => (curr, 1),mergeValue = (curr: (Double, Int), nextValue: Double) => (curr._1 + nextValue, curr._2 + 1),mergeCombiners = (curr: (Double, Int), agg:(Double, Int)) => (curr._1 + agg._1, curr._2 + agg._2))val resultRDD = combineResult.map( item => (item._1, item._2._1 / item._2._2))// 3. 输出数据resultRDD.collect().foreach(println(_))}

运行结果:

(铠,98.5)(耀,99.0)(镜,98.5)Process finished with exit code 0

13. foldByKey

源码:

def foldByKey(zeroValue : V)(func : scala.Function2[V, V, V]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, V]] = { /* compiled code */ }

foldByKey 和 reduceByKey 的区别是可以指定初始值foldByKey 和 Scala中的 foldLeft、foldRight 区别是,这个初始值作用于每一个数据

@Testdef foldByKey() = {sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1))).foldByKey(10)((curr, agg) => curr + agg).collect().foreach(println(_))}

运行结果:

(a,22)(b,11)Process finished with exit code 0

14. aggregateByKey

源码:

def aggregateByKey[U](zeroValue : U)(seqOp : scala.Function2[U, V, U], combOp : scala.Function2[U, U, U])(implicit evidence$3 : scala.reflect.ClassTag[U]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, U]] = { /* compiled code */ }

aggregateByKey(zeroValue)(seqOp, combOp) zeroValue:指定初始值seqOp:作用于每个元素,根据初始值,进行计算combOp:将 seqOp 处理过的结果进行聚合 aggregateByKey 比较适合针对每个数据要先处理,后聚合的场景

@Testdef aggregateByKey() = {val rdd = sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0)))rdd.aggregateByKey(0.8)((zeroValue, item) => item * zeroValue, (curr, agg) => curr + agg).collect().foreach(println(_))}

运行结果:

(手机,20.0)(电脑,16.0)Process finished with exit code 0

15. join

源码:

def join[W](other : org.apache.spark.rdd.RDD[scala.Tuple2[K, W]]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, scala.Tuple2[V, W]]] = { /* compiled code */ }

@Testdef join() = {val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 1)))val rdd2 = sc.parallelize(Seq(("a", 10), ("a", 11), ("b", 12)))rdd1.join(rdd2).collect().foreach(println(_))}

运行结果:

(a,(1,10))(a,(1,11))(a,(2,10))(a,(2,11))(b,(1,12))Process finished with exit code 0

16. sortBy

源码:

def sortBy[K](f : scala.Function1[T, K], ascending : scala.Boolean = { /* compiled code */ }, numPartitions : scala.Int = { /* compiled code */ })(implicit ord : scala.Ordering[K], ctag : scala.reflect.ClassTag[K]) : org.apache.spark.rdd.RDD[T] = { /* compiled code */ }

sortBy 可以用于任何类型数据的RDD,sortByKey 只有 KV 类型数据的RDD中才有sortBy 可以按照任何部分顺序来排序,sortByKey 只能按照 Key 来排序sortByKey 写发简单,不用编写函数了

@Testdef sort() = {val rdd1 = sc.parallelize(Seq(2, 4, 1, 5, 1, 8))val rdd2 = sc.parallelize(Seq(("a", 1), ("b", 3), ("c", 2)))println("-----------------------------")rdd1.sortBy(item => item).collect().foreach(println(_))println("-----------------------------")rdd2.sortBy(item => item._2).collect().foreach(println(_))println("-----------------------------")rdd2.sortByKey().collect().foreach(println(_))}

运行结果:

-----------------------------112458-----------------------------(a,1)(c,2)(b,3)-----------------------------(a,1)(b,3)(c,2)Process finished with exit code 0

17. repartition

源码:

def repartition(numPartitions : scala.Int)(implicit ord : scala.Ordering[T] = { /* compiled code */ }) : org.apache.spark.rdd.RDD[T] = { /* compiled code */ }

repartition 进行重分区的时候,默认是 shuffle 的coalesce 进行重分区的时候,默认是不 shuffle 的,coalesce 默认不能增大分区数

@Testdef partitioning() = {val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5), 2)println(rdd.repartition(5).partitions.size)println(rdd.repartition(1).partitions.size)println(rdd.coalesce(5, shuffle = true ).partitions.size)}}

515Process finished with exit code 0

Action(执行算子)

不同于Transformation操作,Action操作代表一次计算的结束,不再产生新的 RDD,将结果返回到Driver程序或者输出到外部。所以Transformation操作只是建立计算关系,而Action 操作才是实际的执行者。每个Action操作都会调用SparkContext的runJob 方法向集群正式提交请求,所以每个Action操作对应一个Job。

1. reduce

源码:

def reduce(f : scala.Function2[T, T, T]) : T = { /* compiled code */ }

函数中传入的 curr参数,并不是 Value,而是一整条数据reduce 整体上的结果,只有一个聚合的时候,往往需要聚合中间临时变量

@Testdef reduce() = {val rdd = sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0)))val result: (String, Double) = rdd.reduce((curr, agg) => ("总价", curr._2 + agg._2))println(result) // reduce的结果是一个元组}

运行结果:

(总价,45.0)Process finished with exit code 0

2. foreach

源码:

def foreach(f : scala.Function1[T, scala.Unit]) : scala.Unit = { /* compiled code */ }

RDD中自带的foreach算子,注意输出的结果顺序不一定按照原来Seq集合中的顺序,是因为RDD是并行计算,异步操作。

@Testdef foreach() = {val rdd = sc.parallelize(Seq(1, 2, 3, 4))rdd.foreach(item => println(item))}

运行结果:

3124Process finished with exit code 0

3. count、countByKey、countByValue

count 和 countByKey 的结果相距很远,每次调用 Action 都会生成一个 job,job 会运行获取结果,所以在俩个 job中间有大量的 Log,其实就是在启动jobcountByKey的运算结果是一个Map型数据:Map(a -> 2, b -> 1, c -> 1)数据倾斜:如果要解决数据倾斜,是不是要先知道谁倾斜,通过countByKey可以查看Key对应的数量,从而解决倾斜问题

@Testdef count() = {val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("a", 4)))println(rdd.count()) // 求出集合中数据的总数println(rdd.countByKey()) // 得出 Keyprintln(rdd.countByValue())}

运行结果:

4Map(a -> 2, b -> 1, c -> 1)Map((b,2) -> 1, (c,3) -> 1, (a,1) -> 1, (a,4) -> 1)Process finished with exit code 0

4. take、takeSample、first

take() 和 takeSample() 都是获取数据,一个是直接获取,一个是采样获取(又放回、无放回)first:一般情况下,action 会从所有分区获取数据,相对来说速度比较慢,first 只是获取第一个元素所有只会处理第一个分区,所以速度很快,无需处理所有数据

@Testdef take() = {val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6))rdd.take(3).foreach(item => println(item)) // 返回前N个数据println(rdd.first()) // 返回第一个元素rdd.takeSample(withReplacement = false, num = 3).foreach(item => println(item))}

运行结果:

1231215Process finished with exit code 0

5. max、min、mean、sum(数字运算)

没有中位数,缺陷!

@Testdef numberic() = {val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 10, 99, 120, 7))println(rdd.max()) // 最大值println(rdd.min()) // 最小值println(rdd.mean()) // 均值println(rdd.sum()) //求和}

运行结果:

17.888888888888893251.0Process finished with exit code 0

代码汇总

transformation 部分代码汇总

import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}import org.junit.Testclass TransformationOp {val conf: SparkConf = new SparkConf().setAppName("transformation_op").setMaster("local[6]")val sc = new SparkContext(conf)/*mapPartitions 和 map算子一样,只不过map是针对每一条数据进行转换,mapPartitions针对一整个分区的数据进行转换,所以:* 1. map的func参数是单条数据,mapPartitions的func参数是一个集合(一个分区所有的数据)* 2. map的func返回值也是单条数据,mapPartitions的func返回值是一个集合*/@Testdef mapPartitions(): Unit = {// 1. 数据生成// 2. 算子使用// 3. 获取结果sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices = 2).mapPartitions(iter => {iter.foreach(item => println(item))iter}).collect()}@Testdef mapPartitions2(): Unit = {// 1. 数据生成// 2. 算子使用// 3. 获取结果sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices = 2).mapPartitions(iter => {// 如果想给集合中的数字都乘10,该如何操作?// 遍历 iter 其中每一条数据进行转换,转换完成之后,返回 iterval result = iter.map(item => item * 10) //注意这个的map算子并不是RDD中的,而是Scala中的result}).collect().foreach(item => println(item))}/*mapPartitionsWithIndex 和 mapPartitions 的区别是 func 参数中多了一个参数,分区号*/@Testdef mapPartitionsWithIndex(): Unit = {sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices = 2).mapPartitionsWithIndex( (index, iter) => {println("index: " + index)iter.foreach(item => println(item))iter} ).collect()}/*filter 可以过滤掉数据集中的一部分元素filter 中接受的函数,参数是每一个元素,如果这个函数返回true,当前元素就会被加入新数据集,如果返回false,当前元素会被过滤掉*/@Testdef filter() = {sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).filter(item => item % 2 == 0) //取偶数.collect().foreach(item => println(item))}/*sample 作用:采样,尽可能减少数据集的规律损失*/@Testdef sample() = {val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))val rdd2 = rdd1.sample(false, 0.6)//第一个参数为false代表无放回采样,0.6是采样比例val result = rdd2.collect()result.foreach(item => println(item))}/*mapValues 也是 map,只不过map作用于整条数据,mapValues作用于 Value*/@Testdef mapValues() = {sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3))).mapValues(item => item * 10).collect().foreach(println(_))}/*交集*/@Testdef intersection() = {val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5))val rdd2 = sc.parallelize(Seq(3, 4, 5, 6, 7))rdd1.intersection(rdd2).collect().foreach(println(_))}/*并集*/@Testdef union() = {val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5))val rdd2 = sc.parallelize(Seq(3, 4, 5, 6, 7))rdd1.union(rdd2).collect().foreach(println(_))}/*差集*/@Testdef subtract() = {val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5))val rdd2 = sc.parallelize(Seq(3, 4, 5, 6, 7))rdd1.subtract(rdd2) //rdd1-rdd2.collect().foreach(println(_))}/*只分组,不聚合groupByKey 运算结果格式:(Key, (value1, value2))reduceByKey 能不能在 Map 端做 Combiner:1.能不能减少IOgroupByKey 在 Map端做 Combiner 没有意义*/@Testdef groupByKey() = {sc.parallelize(Seq(("a", 1), ("a", 1), ("c", 3))).groupByKey().collect().foreach(println(_)) //只有一个参数打印输出可以简写}/*CombineByKey 算子中接受三个参数:转换数据的函数(初始函数,作用于第一条数据,用于开启整个计算),在分区上进行聚合,把所有分区的聚合结果聚合为最终结果*/@Testdef combineByKey() = {// 1.准备集合val rdd: RDD[(String, Double)] = sc.parallelize(Seq(("铠", 100.0),("耀", 99.0),("镜", 99.0),("镜", 98.0),("铠", 97.0)))// 2.算子操作// 2.1 createCombiner 转换数据// 2.2 mergeValue 分区上的聚合// 2.3 mergeCombiners 把分区上的结果再次聚合,生成最终结果val combineResult = bineByKey(createCombiner = (curr: Double) => (curr, 1),mergeValue = (curr: (Double, Int), nextValue: Double) => (curr._1 + nextValue, curr._2 + 1),mergeCombiners = (curr: (Double, Int), agg:(Double, Int)) => (curr._1 + agg._1, curr._2 + agg._2))val resultRDD = combineResult.map( item => (item._1, item._2._1 / item._2._2))// 3. 输出数据resultRDD.collect().foreach(println(_))}/*foldByKey 和 reduceByKey 的区别是可以指定初始值foldByKey 和 Scala中的 foldLeft、foldRight 区别是,这个初始值作用于每一个数据*/@Testdef foldByKey() = {sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1))).foldByKey(10)((curr, agg) => curr + agg).collect().foreach(println(_))}/*aggregateByKey(zeroValue)(seqOp, combOp)zeroValue:指定初始值seqOp:作用于每个元素,根据初始值,进行计算combOp:将 seqOp 处理过的结果进行聚合aggregateByKey 比较适合针对每个数据要先处理,后聚合的场景*/@Testdef aggregateByKey() = {val rdd = sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0)))rdd.aggregateByKey(0.8)((zeroValue, item) => item * zeroValue, (curr, agg) => curr + agg).collect().foreach(println(_))}@Testdef join() = {val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 1)))val rdd2 = sc.parallelize(Seq(("a", 10), ("a", 11), ("b", 12)))rdd1.join(rdd2).collect().foreach(println(_))}/*sortBy 可以用于任何类型数据的RDD,sortByKey 只有 KV 类型数据的RDD中才有sortBy 可以按照任何部分顺序来排序,sortByKey 只能按照 Key 来排序sortByKey 写发简单,不用编写函数了*/@Testdef sort() = {val rdd1 = sc.parallelize(Seq(2, 4, 1, 5, 1, 8))val rdd2 = sc.parallelize(Seq(("a", 1), ("b", 3), ("c", 2)))println("-----------------------------")rdd1.sortBy(item => item).collect().foreach(println(_))println("-----------------------------")rdd2.sortBy(item => item._2).collect().foreach(println(_))println("-----------------------------")rdd2.sortByKey().collect().foreach(println(_))}/*repartition 进行重分区的时候,默认是 shuffle 的coalesce 进行重分区的时候,默认是不 shuffle 的,coalesce 默认不能增大分区数*/@Testdef partitioning() = {val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5), 2)println(rdd.repartition(5).partitions.size)println(rdd.repartition(1).partitions.size)println(rdd.coalesce(5, shuffle = true ).partitions.size)}}

action 部分代码汇总

import org.apache.spark.{SparkConf, SparkContext}import org.junit.Testclass ActionOp {val conf = new SparkConf().setMaster("local[6]").setAppName("action_op")val sc = new SparkContext(conf)/*需求:最终生成 (结果, price)注意:1. 函数中传入的 curr参数,并不是 Value,而是一整条数据2. reduce 整体上的结果,只有一个*/@Testdef reduce() = {val rdd = sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0)))val result: (String, Double) = rdd.reduce((curr, agg) => ("总价", curr._2 + agg._2))println(result) // reduce的结果是一个元组}/*RDD中自带的foreach算子,注意输出的结果顺序不一定按照原来Seq集合中的顺序是因为RDD是并行计算,异步操作*/@Testdef foreach() = {val rdd = sc.parallelize(Seq(1, 2, 3, 4))rdd.foreach(item => println(item))}/*count 和 countByKey 的结果相距很远,每次调用 Action 都会生成一个 job,job 会运行获取结果,所以在俩个 job中间有大量的 Log,其实就是在启动jobcountByKey的运算结果是一个Map型数据:Map(a -> 2, b -> 1, c -> 1)数据倾斜:如果要解决数据倾斜,是不是要先知道谁倾斜,通过countByKey可以查看Key对应的数量,从而解决倾斜问题*/@Testdef count() = {val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("a", 4)))println(rdd.count()) // 求出集合中数据的总数println(rdd.countByKey()) // 得出 Keyprintln(rdd.countByValue())}/*take() 和 takeSample() 都是获取数据,一个是直接获取,一个是采样获取(又放回、无放回)first:一般情况下,action 会从所有分区获取数据,相对来说速度比较慢,first 只是获取第一个元素所有只会处理第一个分区,所以速度很快,无需处理所有数据*/@Testdef take() = {val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6))rdd.take(3).foreach(item => println(item)) // 返回前N个数据println(rdd.first()) // 返回第一个元素rdd.takeSample(withReplacement = false, num = 3).foreach(item => println(item))}// 等等数字运算... 注意对于数字类型的支持,都是Action@Testdef numberic() = {val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 10, 99, 120, 7))println(rdd.max()) // 最大值println(rdd.min()) // 最小值println(rdd.mean()) // 均值println(rdd.sum()) //求和}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。