600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > pyspark rdd 基本操作

pyspark rdd 基本操作

时间:2023-12-27 12:08:50

相关推荐

pyspark rdd 基本操作

pyspark rdd 基本操作

原文链接

#!/usr/bin/env python3# -*- coding: utf-8 -*-"""Created on Fri Mar 8 17:09:44 @author: lg"""from pyspark import SparkContext ,SparkConfconf=SparkConf().setAppName("miniProject").setMaster("local[4]")#conf=SparkConf().setAppName("lg").setMaster("spark://192.168.10.182:7077")sc = SparkContext(conf=conf)#创建RDD#接下来我们使用parallelize方法创建一个RDD:intRDD = sc.parallelize([3,1,2,5,5])stringRDD = sc.parallelize(['Apple','Orange','Grape','Banana','Apple'])#RDD转换为Python数据类型#RDD类型的数据可以使用collect方法转换为python的数据类型:print (intRDD.collect())print (stringRDD.collect())#map运算可以通过传入的函数,将每一个元素经过函数运算产生另外一个RDD。#比如下面的代码中,将intRDD中的每个元素加1之后返回,并转换为python数组输出:print (intRDD.map(lambda x:x+1).collect())#filter运算#filter可以用于对RDD内每一个元素进行筛选,并产生另外一个RDD。#下面的例子中,我们筛选intRDD中数字小于3的元素,同事筛选stringRDD中包含ra的字符串:print (intRDD.filter(lambda x: x<3).collect())print (stringRDD.filter(lambda x:'ra' in x).collect())#distinct运算#distinct运算会删除重复的元素,比如我们去除intRDD中的重复元素1:print (intRDD.distinct().collect())#randomSplit运算#randomSplit 运算将整个集合以随机数的方式按照比例分为多个RDD,比如按照0.4和0.6的比例将intRDD分为两个RDD,并输出:#sRDD = intRDD.randomSplit([0.4,0.6])print (len(sRDD))print (sRDD[0].collect())print (sRDD[1].collect())#groupBy运算#groupBy运算可以按照传入匿名函数的规则,将数据分为多个Array。比如下面的代码将intRDD分为偶数和奇数:result = intRDD.groupBy(lambda x : x % 2).collect()print (sorted([(x, sorted(y)) for (x, y) in result]))#3、多个RDD转换运算#RDD也支持执行多个RDD的运算,这里,我们定义三个RDD:intRDD1 = sc.parallelize([3,1,2,5,5])intRDD2 = sc.parallelize([5,6])intRDD3 = sc.parallelize([2,7])#并集运算#可以使用union函数进行并集运算:print (intRDD1.union(intRDD2).union(intRDD3).collect())#交集运算#可以使用intersection进行交集运算:print (intRDD1.intersection(intRDD2).collect())#差集运算#可以使用subtract函数进行差集运算:print (intRDD1.subtract(intRDD2).collect())#笛卡尔积运算#可以使用cartesian函数进行笛卡尔乘积运算:print (intRDD1.cartesian(intRDD2).collect())#基本“动作”运算#读取元素#可以使用下列命令读取RDD内的元素,这是Actions运算,所以会马上执#取第一条数据print (intRDD.first())#取前两条数据print (intRDD.take(2))#升序排列,并取前3条数据print (intRDD.takeOrdered(3))#降序排列,并取前3条数据print (intRDD.takeOrdered(3,lambda x:-x))#统计功能#可以将RDD内的元素进行统计运算:#统计print (intRDD.stats())#最小值print (intRDD.min())#最大值print (intRDD.max())#标准差print (intRDD.stdev())#计数print (intRDD.count())#求和print (intRDD.sum())#平均print (intRDD.mean())#5、RDD Key-Value基本“转换”运算#Spark RDD支持键值对运算,Key-Value运算时mapreduce运算的基础,本节介绍RDD键值的基本“转换”运算。#初始化#我们用元素类型为tuple元组的数组初始化我们的RDD,这里,每个tuple的第一个值将作为键,而第二个元素将作为值kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])#得到key和value值#可以使用keys和values函数分别得到RDD的键数组和值数组:print (kvRDD1.keys().collect())print (kvRDD1.values().collect())#筛选元素#可以按照键进行元素筛选,也可以通过值进行元素筛选,和之前的一样,使用filter函数,#这里要注意的是,虽然RDD中是以键值对形式存在,但是本质上还是一个二元组,二元组的第一个值代表键,#第二个值代表值,所以按照如下的代码既可以按照键进行筛选,我们筛选键值小于5的数据:print (kvRDD1.filter(lambda x:x[0] < 5).collect())print (kvRDD1.filter(lambda x:x[1] < 5).collect())#值运算#我们可以使用mapValues方法处理value值,下面的代码将value值进行了平方处理:print (kvRDD1.mapValues(lambda x:x**2).collect())#按照key排序#可以使用sortByKey按照key进行排序,传入参数的默认值为true,#是按照从小到大排序,也可以传入参数false,表示从大到小排序:print (kvRDD1.sortByKey().collect())print (kvRDD1.sortByKey(True).collect())print (kvRDD1.sortByKey(False).collect())#合并相同key值的数据#使用reduceByKey函数可以对具有相同key值的数据进行合并。比如下面的代码,#由于RDD中存在(3,4)和(3,6)两条key值均为3的数据,他们将被合为一条数据:print (kvRDD1.reduceByKey(lambda x,y:x+y).collect())#多个RDD Key-Value“转换”运算初始化kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])kvRDD2 = sc.parallelize([(3,8)])#内连接运算#join运算可以实现类似数据库的内连接,将两个RDD按照相同的key值join起来,#kvRDD1与kvRDD2的key值唯一相同的是3,kvRDD1中有两条key值为3的数据(3,4)和(3,6),#而kvRDD2中只有一条key值为3的数据(3,8),所以join的结果是(3,(4,8)) 和(3,(6,8)):print (kvRDD1.join(kvRDD2).collect())#左外连接#使用leftOuterJoin可以实现类似数据库的左外连接,如果kvRDD1的key值对应不到kvRDD2,就会显示None#使用leftOuterJoin可以实现类似数据库的左外连接,如果kvRDD1的key值对应不到kvRDD2,就会显示Noneprint (kvRDD1.leftOuterJoin(kvRDD2).collect())#右外连接#使用rightOuterJoin可以实现类似数据库的右外连接,如果kvRDD2的key值对应不到kvRDD1,就会显示Noneprint (kvRDD1.rightOuterJoin(kvRDD2).collect())#删除相同key值数据#使用subtractByKey运算会删除相同key值得数据:print (kvRDD1.subtractByKey(kvRDD2).collect())#Key-Value“动作”运算#读取数据#可以使用下面的几种方式读取RDD的数据:#读取第一条数据print (kvRDD1.first())#读取前两条数据print (kvRDD1.take(2))#读取第一条数据的key值print (kvRDD1.first()[0])#读取第一条数据的value值print (kvRDD1.first()[1])#按key值统计:#使用countByKey函数可以统计各个key值对应的数据的条数:#print (kvRDD1.countByKey().collect())#lookup查找运算#使用lookup函数可以根据输入的key值来查找对应的Value值:print (kvRDD1.lookup(3))#持久化操作#spark RDD的持久化机制,可以将需要重复运算的RDD存储在内存中,以便大幅提升运算效率,有两个主要的函数:#持久化#使用persist函数对RDD进行持久化:kvRDD1.persist()#使用unpersist函数对RDD进行持久化:kvRDD1.unpersist()

posted on -03-08 18:34 luoganttcc 阅读(...) 评论(...) 编辑 收藏

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。
相关阅读
PySpark RDD操作

PySpark RDD操作

2024-01-14

PySpark | RDD

PySpark | RDD

2021-04-07

pyspark RDD详细教程

pyspark RDD详细教程

2021-11-16

pyspark  DataFrame 转RDD

pyspark DataFrame 转RDD

2021-06-19