Spark RDD Scala语言编程
RDD(Resilient Distributed Dataset)是一个不可变的分布式对象集合, 每个rdd被分为多个分区, 这些分区运行在集群的不同节点上。rdd支持两种类型的操作:转化(trainsformation)和行动(action), Spark只会惰性计算rdd, 也就是说, 转化操作的rdd不会立即计算, 而是在其第一次遇到行动操作时才去计算, 如果想在多个行动操作中复用一个rdd, 可以使用RDD.persist()让Spark把这个rdd缓存下来。
0. 初始化SparkContext
import org.apache.spark.{SparkConf, SparkContext}val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("spark-rdd-demo"))
1. 创建RDD
Spark提供了2种创建RDD的方式:
1.1 读取外部数据集
val lines = sc.parallelize(List("Java", "Scala", "C++"))
1.2 在驱动器程序中对一个集合进行并行化
val lines = sc.textFile("hdfs://dash-dev:9000/input/test.txt")
2. RDD操作
2.1 转化操作
RDD的转化操作是返回新RDD的操作, 常用转化操作总结如下:
表1: 对一个数据为{1,2,3,3}的RDD进行基本的转化操作
表2: 对数据分别为{1,2,3}和{2,3,4}RDD进行针对2个RDD的转化操作
2.2 行动操作
RDD的行动操作会把最终求得的结果返回驱动器程序, 或者写入外部存储系统中。
表3: 对一个数据为{1,2,3,3}的RDD进行基本RDD的行动操作
2.3 向Spark传递函数
Spark的大部分转化和行动操作都要依赖于用户传递的函数来计算, 当传递的对象是某个对象的成员, 或者包含了对某个对象中一个字段的引用时(如self.field), Spark就会把整个对象发送到工作节点上--这比你本意想传递的东西大太多了!替代的方案是,把你需要的字段从对象中拿出来放到一个局部变量中, 然后传递这个局部变量:
class SearchFunctions(val query: String) {def isMatch(s: String): Boolean = {s.contains(query)}def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {// 问题:"isMatch"表示"this.isMatch", 因此会传递整个thisrdd.map(isMatch)}def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {// 问题: "query"表示"this.query", 因此会传递整个thisrdd.map(x => x.split(query))}def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {// 安全:只把我们需要的字段拿出来放入局部变量中val localQuery = this.queryrdd.map(x => x.split(localQuery))}}
另外, 要注意的是, Spark要求我们传入的函数及其应用的数据是可序列化的(实现了Java的Serializable接口), 否则会出现NotSerializableException。
作者 @wusuopubupt
11月11日