600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > Spark RDD基本操作

Spark RDD基本操作

时间:2022-02-17 18:33:29

相关推荐

Spark RDD基本操作

Spark RDD Scala语言编程

RDD(Resilient Distributed Dataset)是一个不可变的分布式对象集合, 每个rdd被分为多个分区, 这些分区运行在集群的不同节点上。rdd支持两种类型的操作:转化(trainsformation)和行动(action), Spark只会惰性计算rdd, 也就是说, 转化操作的rdd不会立即计算, 而是在其第一次遇到行动操作时才去计算, 如果想在多个行动操作中复用一个rdd, 可以使用RDD.persist()让Spark把这个rdd缓存下来。

0. 初始化SparkContext

import org.apache.spark.{SparkConf, SparkContext}val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("spark-rdd-demo"))

1. 创建RDD

Spark提供了2种创建RDD的方式:

1.1 读取外部数据集

val lines = sc.parallelize(List("Java", "Scala", "C++"))

1.2 在驱动器程序中对一个集合进行并行化

val lines = sc.textFile("hdfs://dash-dev:9000/input/test.txt")

2. RDD操作

2.1 转化操作

RDD的转化操作是返回新RDD的操作, 常用转化操作总结如下:

表1: 对一个数据为{1,2,3,3}的RDD进行基本的转化操作

表2: 对数据分别为{1,2,3}和{2,3,4}RDD进行针对2个RDD的转化操作

2.2 行动操作

RDD的行动操作会把最终求得的结果返回驱动器程序, 或者写入外部存储系统中。

表3: 对一个数据为{1,2,3,3}的RDD进行基本RDD的行动操作

2.3 向Spark传递函数

Spark的大部分转化和行动操作都要依赖于用户传递的函数来计算, 当传递的对象是某个对象的成员, 或者包含了对某个对象中一个字段的引用时(如self.field), Spark就会把整个对象发送到工作节点上--这比你本意想传递的东西大太多了!替代的方案是,把你需要的字段从对象中拿出来放到一个局部变量中, 然后传递这个局部变量:

class SearchFunctions(val query: String) {def isMatch(s: String): Boolean = {s.contains(query)}def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {// 问题:"isMatch"表示"this.isMatch", 因此会传递整个thisrdd.map(isMatch)}def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {// 问题: "query"表示"this.query", 因此会传递整个thisrdd.map(x => x.split(query))}def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {// 安全:只把我们需要的字段拿出来放入局部变量中val localQuery = this.queryrdd.map(x => x.split(localQuery))}}

另外, 要注意的是, Spark要求我们传入的函数及其应用的数据是可序列化的(实现了Java的Serializable接口), 否则会出现NotSerializableException。

作者 @wusuopubupt

11月11日

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。