600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > java spark 遍历rdd_Spark入门(四):RDD基本操作

java spark 遍历rdd_Spark入门(四):RDD基本操作

时间:2024-08-23 20:36:05

相关推荐

java spark 遍历rdd_Spark入门(四):RDD基本操作

1.RDD转换

RDD的所有转换操作都不会进行真正的计算

1.1单个RDD转换操作

# 创建测试RDD

val rdd = sc.parallelize(Array("hello world","java","scala easy"))

# 1.map():遍历RDD中的每个元素,将返回值构成新的RDD,返回值类型可和原RDD不一致

val mapRdd = rdd.map(x => "map:"+x)

mapRdd.foreach(println)

# 输出

# map:hello world

# map:java

# map:scala easy

# 2.flatMap(): 遍历RDD中的每个元素,将返回的迭代器的所有内容构成新的 RDD

val flatMapRdd = rdd.flatMap(x => x.split(" "))

flatMapRdd.foreach(println)

# 输出

# hello

# world

# java

# scala

# easy

# 3.filter():遍历RDD中的每个元素,将匹配的元素构成新的RDD

val filterRdd = rdd.filter(x => x.contains("java"))

filterRdd.foreach(x => x.contains("java"))

filterRdd.foreach(println)

# 输出

# java

# 4.distinct():去重

val distinctRdd = flatMapRdd.distinct()

distinctRdd.foreach(println)

# scala

# hello

# easy

# java

# world

# 5.sample(withReplacement, fraction, [seed]):对 RDD 采样,以及是否替换

1.2 两个RDD转换操作

# 创建两个测试RDD

val rdd1 = sc.parallelize(Array("java","scala","spring"))

val rdd2 = sc.parallelize(Array("c++","java","spark"))

# 1.union():合并两个RDD

val unionRdd = rdd1.union(rdd2)

unionRdd.foreach(println)

# java

# scala

# spring

# c++

# java

# spark

# 2.intersection():求两个RDD元素的共同元素

val intersectionRdd = rdd1.intersection(rdd2)

intersectionRdd.foreach(println)

# java

# 3.subtract():移除RDD中的指定元素

val subtractRdd = rdd1.subtract(rdd2)

subtractRdd.foreach(println)

# 4.cartesian():求两个RDD元素的笛卡尔积

val cartesianRdd = rdd1.cartesian(rdd2)

cartesianRdd.foreach(println)

# (java,c++)

# (java,java)

# (java,spark)

# (scala,c++)

# (scala,java)

# (scala,spark)

# (spring,c++)

# (spring,java)

# (spring,spark)

2.行动操作

行动操作会真正触发RDD的计算操作

2.1 reduce()

它接收一个函数作为参数,这个

函数要操作两个 RDD 的元素类型的数据并返回一个同样类型的新元素

val rdd = sc.parallelize(Array(1,2,3,4))

# 计算所有元素的总和

println(rdd.reduce((x,y) => x+y))

# 10

2.2 fold()

fold() 和 reduce() 类似,接收一个与 reduce() 接收的函数签名相同的函数,再加上一个

“初始值”来作为每个分区第一次调用时的结果。(例如 +

对应的 0, * 对应的 1,或拼接操作对应的空列表)。

# 计算所有元素的综合

println(rdd.fold(0)((x,y) => x+y))

# 10

2.3 collect()

将整个RDD的内容返回

rdd.collect().foreach(print)

#1234

2.4 take(n)

返回RDD中的n个元素

rdd.take(2).foreach(print)

#12

2.5 top(n)

返回RDD中前n个元素,top()会使用数据的默认排序

rdd.top(3)

#123

2.6 count()

返回RDD中所有元素的个数

print(rdd.count())

4

2.7 countByValue()

返回个元素在RDD中出现的个数

rdd.countByValue().foreach(println+)

(1,1)

(3,1)

(2,1)

(4,1)

2.8 takeSample(withReplacement, num, [seed])

从 RDD 中返回任意num个元素

rdd.takeSample(false,3)

2.9 foreach

对 RDD 中的每个元素使用给

定的函数

2.10 aggregate(zeroValue)(seqOp, combOp)

和 reduce() 相似,可以返回不同类型的函数

val result = rdd.aggregate((0, 0))((x, y) =>(x._1 + y, x._2 + 1),(part1, part2) =>(part1._1 + part2._1,part1._2 + part2._2))

print(result)

(10,4)

参数说明

((0, 0))

# 第一步:指定初始值

((x, y) =>(x._1 + y, x._2 + 1),

# 2:分片计算

# x为初始值(0,0),y为RDD元素(1,2,3,4)

# 假设RDD分布在两个分片上(1,2)为一个分片,(3,4)为一个分片

# 则计算结果如下:

# 分片1:

# 0+1,0+1

# 1+2,1+1

# 分片1结果:(3,2)

# 分片2:

# 0+3,0+1

# 3+4,1+1

# 分片2结果:(7,2)

(part1, part2) =>(part1._1 + part2._1,part1._2 + part2._2))

# 第三步:合并分片数据

# 3+7,2+2

# 输出结果(10,4)

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。