600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > PySpark RDD操作

PySpark RDD操作

时间:2019-06-14 18:48:02

相关推荐

PySpark RDD操作

前提条件:

1、拥有Ubuntu16.04环境

2、Ubuntu下安装好Spark和PySpark

题目一:RDD创建

首先进入pyspark命令行

$ pyspark

(1)从文件中加载

从本地文件创建RDD>>> lines = sc.textFile("file:///home/hadoop/data.txt")>>> lines.collect()从HDFS文件创建RDD>>> lines1 = sc.textFile("hdfs://localhost:8020/user/hadoop/data.txt")>>> lines1.collect()

(2)通过集合创建

>>> list = [1,2,3,4,5]>>> rdd = sc.parallelize(list)>>> rdd.collect()

题目二:熟悉RDD操作

查找相关资料,完成以下转换和动作操作的练习。

转换:

转换是懒加载,只作记录,不作计算,只有遇到动作操作时才真正触发计算。

1、map()

2、flatMap()

3、filter()

4、groupByKey()

5、reduceByKey()

6、sortByKey()

动作

1、count()

2、collect()

3、first()

4、take(n)

5、reduce(func)

6、foreach(func)

7、saveAsTextFile(func)

题目三:RDD综合应用

1、随机一个文件生成10个人的年龄,第一个字段为ID,第二个字段为年龄,然后用RDD算出10个人的平均年龄

a、随机生成年龄

agegenerate.py

import random as rdfor i in range(10):num = rd.random()*100str1 = str(i) + " " + str(int(num))print(str1)

随机生成年龄结果如下

0 731 832 13 574 25 926 07 198 639 99

在Linux新建出一个age.txt文件,内容为以上随机生成年龄的结果。

b、RDD操作计算出平均年龄

由年龄文件生成rdd1>>> rdd1 = sc.textFile("age.txt")>>> rdd1.collect()['0 73', '1 83', '2 1', '3 57', '4 2', '5 92', '6 0', '7 19', '8 63', '9 99']RDD转换成(年龄,1)形式的RDD>>> rdd2 = rdd1.map(lambda line : line.split(" ")).map(lambda x:(int(x[1]),1))>>> rdd2.collect()[(73, 1), (83, 1), (1, 1), (57, 1), (2, 1), (92, 1), (0, 1), (19, 1), (63, 1), (99, 1)]计算年龄平均值(年龄的总和/年龄的个数)>>> rdd2.keys().reduce(lambda x,y : x+y)/rdd2.values().count()48.9

2、给定一组键值对(“spark”,3),(“hadoop”,6),(“hadoop”,4),(“spark”,5),key表示图书名称,value表示某天图书销量,请计算每种图书的每天平均销量。

结果是很显然是:(“spark”,4),(“hadoop”,5)

编程实现如下:

>>> rdd = sc.parallelize([("spark",2),("hadoop",6),("hadoop",4),("spark",6)])>>> rdd.mapValues(lambda x : (x,1)).reduceByKey(lambda x,y : (x[0]+y[0],x[1] + y[1])).mapValues(lambda x : (x[0] / x[1])).collect()

第2题内容来源于:http://dblab./blog/1706-2/,详细解析可参考来源链接。

题目四:RDD持久化

持久化:持久化后的RDD不需要重新计算,直接读取缓存中的数据,加快计算过程

RDD 可以使用 persist() 方法或 cache() 方法进行持久化。用unpersist()方法将持久化的RDD从缓存中移除。

>>> rdd = sc.parallelize(["b", "a", "c"])>>> rdd.persist()ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:194>>> rdd.is_cachedTrue

持久化后可通过浏览器监控页面(4040端口)查看持久化情况。注意:该过程可能会延时一段时间才出来。

去持久化:RDD使用unpersist()方法来去持久化

>>> rdd.unpersist()>>> rdd.is_cachedFalse

完成!enjoy it!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。