600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > 大数据——Spark RDD常用算子总结

大数据——Spark RDD常用算子总结

时间:2020-06-03 09:07:18

相关推荐

大数据——Spark RDD常用算子总结

Spark的核心是建立在同一的抽象弹性分布式数据集(Resilient Distributed Datasets,RDD)之上的,这使得Spark的各个组件可以无缝的进行集成,能够在同一个应用程序中完成大数据处理

1.RDD基本概念

RDD是spark提供的最重要的抽象概念,它是一种有容错机制的特殊数据集合,可以分布在集群的节点上,以函数式操作集合的方式进行各种并行操作

可以将RDD理解为一个分布式对象集合,本质上是一个只读的分区记录集合。每个RDD可以分成多个分区,每个分区就是一个数据集片段。一个RDD的不同分区可以保存到集群中的不同节点上,从而可以在集群中的不同节点上进行并行计算。

弹性:

存储的弹性:内存与磁盘的自动切换

容错的弹性:数据丢失可以自动恢复

计算的弹性:计算出错重试机制

分片的弹性:可以根据需要重新分片

分布式:

数据存储在大数据集群不同节点上

数据集:

RDD封装了计算逻辑,并不保存数据

数据抽象:

RDD是一个抽象类,需要子类具体实现

不可变:

RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑(有向无环图DAG

可分区:

实现并行计算

2.RDD的核心属性

分区列表

RDD数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性

分区计算函数

Spark在计算时,是使用分区函数对每一个分区进行计算

RDD之间的依赖关系

RDD是计算模型的封装,当需求中需要将多个计算模型进行组合是,就需要将多个RDD建立依赖关系

分区器(可选)

当数据为KV类型数据时,可以通过设定分区器自定义数据的分区

首选位置(可选

计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算

3.RDD工作原理

1)启动Yarn集群环境

2)Spark通过申请资源创建调度节点和计算节点

3)Spark框架根据需求将计算逻辑根据分区划分成不同的任务

4)调度节点将任务根据计算节点状态发送到对应的计算节点进行计算

从以上流程可以看出RD在整个流程中主要用于讲逻辑进行封装,并生成Task发送给Executor节点执行计算

4.RDD创建

在Spark中创建RDD的方式可以分为四种:

1)从集合(内存)中创建RDD

从集合中创建RDD,Spark主要提供了两个方法:parallelize和makeRDD

val conf = new SparkConf().setMaster("local[*]").setAppName("test1")val sc = new SparkContext(conf)val rdd1 = sc.parallelize(List(1, 2, 3, 4))val rdd2 = sc.makeRDD(List(1, 2, 3, 4))println(rdd1.collect().mkString(","))println(rdd2.collect().mkString(","))sc.stop()

从底层代码实现来讲,makeRDD方法其实就是parallelize方法

/** Distribute a local Scala collection to form an RDD.** This method is identical to `parallelize`.* @param seq Scala collection to distribute* @param numSlices number of partitions to divide the collection into* @return RDD representing distributed collection*/def makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T] = withScope {parallelize(seq, numSlices)}

2)从外部存储(文件)创建RDD

由外部存储系统的数据集创建RDD包括:本地的文件系统,所有Hadoop支持的数据集,比如HDFS、HBase等

val conf = new SparkConf().setMaster("local[*]").setAppName("test1")val sc = new SparkContext(conf)val rdd = sc.textFile("file:///D:\\Study\\13_spark\\cha01\\file\\tweets.txt")rdd.collect().foreach(println)sc.stop()

3)从其他RDD创建

主要是通过一个RDD运算完后,在产生新的RDD

4)直接创建RDD(new)

使用new的方式直接构造RDD一般由Spark框架自身使用

5.RDD算子

Value类型

1)map

作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

需求:创建一个RDD,使每个元素*2组成新的RDD

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建RDDval listRdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5))//4、每个数变成原来的2倍val rsRdd: RDD[Int] = listRdd.map(_ * 2)//5、打印rsRdd.foreach(println(_))}}

2)mapPartitions

类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。

需求:创建一个RDD,使每个元素*2组成新的RDD

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建RDDval listRdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5))//4、每个数变成原来的2倍val rsRdd: RDD[Int] = listRdd.mapPartitions(i=>{i.map(_*2)})//5、打印rsRdd.foreach(println)}}

3)mapPartitionsWithIndex(func)

类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U];

需求:创建一个RDD,使每个元素跟所在分区形成一个元组组成一个新的RDD

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建2个分区的RDDval listRdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5), 2)//4、每个数变成原来的2倍val rsRdd: RDD[(Int, Int)] = listRdd.mapPartitionsWithIndex((index, item) => {item.map((index, _))})//5、打印rsRdd.foreach(println)}}

4)flatMap(func)

对集合中每个元素进行操作然后再扁平化。

需求:把集合中元素按照’ '提取出来

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建2个分区的RDDval listRdd: RDD[String] = sc.makeRDD(List("Hello World","Hello Spark"))//4、扁平化val rsRdd: RDD[String] = listRdd.flatMap(_.split(" "))//5、打印rsRdd.foreach(println)}}

5)glom

作用:将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]

需求:创建一个3个分区的RDD,并将每个分区的数据放到一个数组

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建3个分区的RDDval listRdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8), 3)//4、glomval rsRdd: RDD[Array[Int]] = listRdd.glom()//5、打印rsRdd.foreach(item => {println(item.mkString(","))})}}

6)groupBy

分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。

需求:创建一个RDD,按照元素模以2的值进行分组。

def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建RDDval listRdd: RDD[Int] = sc.makeRDD(1 to 20)//4、分组val rsRdd: RDD[(Int, Iterable[Int])] = listRdd.groupBy(_%2)//5、打印rsRdd.foreach(t=>println(t._1,t._2))}

7)filter

作用:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。

需求:创建一个RDD ,过滤出和2取余为0的数

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建RDDval listRdd: RDD[Int] = sc.makeRDD(1 to 10)//4、过滤val rsRdd: RDD[Int] = listRdd.filter(_%2==0)//5、打印rsRdd.foreach(println)}}

8)distinct

去重分区

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建RDDval listRdd: RDD[Int] = sc.makeRDD(List(1,2,1,2,1,2))//4、用2个分区保存结果val rsRdd: RDD[Int] = listRdd.distinct(2)//5、打印rsRdd.foreach(println)}}

9)repartition(numPartitions)

重分区,根据分区数,重新通过网络随机洗牌所有数据。

需求:创建一个4个分区的RDD,对其重新分区

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建4个分区的Rddval listRdd: RDD[Int] = sc.makeRDD(1 to 10, 4)println("repartition before:" + listRdd.partitions.size)//4、重新分区val rsRdd: RDD[Int] = listRdd.repartition(2)//5、打印println("repartition after:" + rsRdd.partitions.size)}}

10)sortBy(func,[ascending], [numTasks])

作用;使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。

需求:创建一个RDD,按照不同的规则进行排序

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建4个分区的Rddval listRdd: RDD[Int] = sc.makeRDD(1 to 10)//4、升序val sortAsc: Array[Int] = listRdd.sortBy(x => x).collect()println(sortAsc.mkString(","))//5、降序val sortDesc: Array[Int] = listRdd.sortBy(x => x, false).collect()println(sortDesc.mkString(","))}}

双Value类型交互

1) union(otherDataset)

作用:对源RDD和参数RDD求并集后返回一个新的RDD

需求:创建两个RDD,求并集

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建2个listval list1: RDD[Int] = sc.makeRDD(1 to 2)val list2: RDD[Int] = sc.makeRDD(2 to 4)//4、求并级val rsRdd: Array[Int] = list1.union(list2).collect()//5、打印rsRdd.foreach(println)}}

2)subtract (otherDataset)

差集,计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来,创建两个RDD,求第一个RDD与第二个RDD的差集

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建2个listval list1: RDD[Int] = sc.makeRDD(1 to 10)val list2: RDD[Int] = sc.makeRDD(5 to 10)//4、求并级val rsRdd: Array[Int] = list1.subtract(list2).collect()//5、打印rsRdd.foreach(println)}}

3)intersection(otherDataset)

交集,对源RDD和参数RDD求交集后返回一个新的RDD

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建2个listval list1: RDD[Int] = sc.makeRDD(1 to 10)val list2: RDD[Int] = sc.makeRDD(5 to 10)//4、交集val rsRdd: Array[Int] = list1.intersection(list2).collect()//5、打印rsRdd.foreach(println)}}

Key-Value类型

1)groupByKey

groupByKey也是对每个key进行操作,但只生成一个sequence。

需求:wordcount

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建listval list: RDD[String] = sc.makeRDD(List("Hello World","Hello Scala","Spark Spark Spark"))//4、wordcountval mapRdd: RDD[(String, Int)] = list.flatMap(_.split(" ")).map((_,1))//5、groupByKeyval groupByKeyRdd: RDD[(String, Iterable[Int])] = mapRdd.groupByKey()groupByKeyRdd.foreach(println)//6、聚合val wcRdd: RDD[(String, Int)] = groupByKeyRdd.map(t=>(t._1,t._2.size))wcRdd.foreach(println)}}

2)reduceByKey

在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

需求:wordcount

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建listval list: RDD[String] = sc.makeRDD(List("Hello World","Hello Scala","Spark Spark Spark"))//4、扁平化val mapRdd: RDD[(String, Int)] = list.flatMap(_.split(" ")).map((_,1))//5、wordcountval reduceMapRdd: RDD[(String, Int)] = mapRdd.reduceByKey(_+_)//6、打印reduceMapRdd.foreach(println)}}

3)sortByKey([ascending], [numTasks])

作用:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

创建一个pairRDD,按照key的正序和倒序进行排序

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建listval listRdd: RDD[(Int, Int)] = sc.makeRDD(List((1, 3), (1, 2), (1, 4), (2, 3), (3, 6), (3, 8)))//4、升序val sortAsc: Array[(Int, Int)] = listRdd.sortByKey(true).collect()println("升序:" + sortAsc.mkString(","))//5、降序val sortDesc: Array[(Int, Int)] = listRdd.sortByKey(false).collect()println("降序:" + sortDesc.mkString(","))}}

4)join(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)val r1: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))val r2: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))val rsRdd: Array[(Int, (String, String))] = r1.join(r2).collect()rsRdd.foreach(println)}}

Action算子

1)reduce,作用:通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。

需求:创建一个RDD,将所有元素聚合得到结果。

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建RDDval rdd: RDD[Int] = sc.makeRDD(1 to 5)//4、reduceval rs: Int = rdd.reduce(_+_)//5、结果println(rs)}}

2) collect()

在驱动程序中,以数组的形式返回数据集的所有元素。

创建一个RDD,并将RDD内容收集到Driver端打印

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建RDDval lisrRdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5))val ints: Array[Int] = lisrRdd.collect()println(ints.mkString(","))}}

3) count()

返回RDD中元素的个数

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建RDDval rdd: RDD[Int] = sc.makeRDD(1 to 10)val rddCount: Long = rdd.count()println(rddCount)}}

4)first()

返回RDD中的第一个元素

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建RDDval rdd: RDD[Int] = sc.makeRDD(1 to 10)//4、返回RDD中第一个元素val firstNum: Int = rdd.first()println(firstNum)}}

5) take(n)

返回一个由RDD的前n个元素组成的数组

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建RDDval rdd: RDD[Int] = sc.makeRDD(1 to 10)val top3: Array[Int] = rdd.take(3)top3.foreach(println)}}

6) takeOrdered(n)

返回该RDD排序后的前n个元素组成的数组

def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建RDDval rdd: RDD[Int] = sc.makeRDD(List(5, 3, 2, 1, 7, 6))val top3: Array[Int] = rdd.takeOrdered(3)println(top3.mkString(","))}

7)countByKey()

针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建RDDval rdd: RDD[(Int, Int)] = sc.makeRDD(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)))//4、统计keyval countKey: collection.Map[Int, Long] = rdd.countByKey()//5、打印println(countKey)}}

8)foreach

打印

object SparkRdd {def main(args: Array[String]): Unit = {//1、创建本地spark配置文件val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//2、创建Spark上下文对象val sc = new SparkContext(config)//3、创建RDDval rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5))rdd.foreach(println)}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。