600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > pyspark RDD详细教程

pyspark RDD详细教程

时间:2022-01-24 13:57:20

相关推荐

pyspark RDD详细教程

Spark的核心是RDD(Resilient Distributed Dataset)即弹性分布式数据集,属于一种分布式的内存系统的数据集应用,这些元素在多个节点上运行和操作,以便在集群上进行并行处理。Spark主要优势就是来自RDD本身的特性,RDD能与其他系统兼容,可以导入外部存储系统的数据集,例如,HDFS、HBase或者其他Hadoop数据源

官方API

1、RDD的基本运算

2、基本RDD“转换”运算

首先我们要导入PySpark并初始化Spark的上下文环境:

初始化

from pyspark.sql.session import SparkSessionspark=SparkSession.builder.master("local[*]").appName("duqu").getOrCreate()sc = spark.sparkContext

创建RDD

创建RDD的两种方法:

1:读取一个数据集(Windows本地,服务器本地,hdfs)

#Windows本地row = sc.textFile('./data/pytest.txt')#服务器本地#file:// 表示从本地文件系统读row = sc.textFile('file:///chenhong/Python/pytest.txt')#hdfs#在路径前面加上hdfs://row = sc.textFile('hdfs:///chenhong/data/pytest.txt')

2: 读取一个集合(列表/元组/混合/子元素的长度也不一定要一样)

我们使用parallelize方法创建一个RDD:

#子元素长度不一样时,不能直接转dataframedata = [('Alex','male',3),('Nancy','female',6),['Jack','male',1,80]]rdd = sc.parallelize(data)#输出[('Alex', 'male', 3), ('Nancy', 'female', 6), ['Jack', 'male',1, 80]]

RDD转换为Python数据类型

#RDD类型的数据可以使用collect方法转换为python的数据类型:print (intRDD.collect())print (stringRDD.collect())#输出为:[3, 1, 2, 5, 5]['APPLE', 'Orange', 'Grape', 'Banana','Apple']

map和mapValues

map运算可以通过传入的函数,将每一个元素经过函数运算产生另外一个RDD。

mapValues是在不改变key值的情况下,把rdd键值对中的value传给function

#比如下面的代码中,将intRDD中的每个元素加1之后返回,并转换为python数组输出:print (intRDD.map(lambda x:x+1).collect())#输出为:[4, 2, 3, 6, 6]###########################mapValues#######################rdd = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes",'pig'])])def f(x):return len(x)print(rdd.mapValues(f).collect())#输出[('a', 3), ('b', 2)]

filter运算

filter可以用于对RDD内每一个元素进行筛选,并产生另外一个RDD。

#下面的例子中,我们筛选intRDD中数字小于3的元素,同事筛选stringRDD中包含ra的字符串:print (intRDD.filter(lambda x: x<3).collect())print (stringRDD.filter(lambda x:'ra' in x).collect())#输出为:[1, 2]['Orange', 'Grape']

distinct运算

distinct运算会删除重复的元素

#比如我们去除intRDD中的重复元素1:print (intRDD.distinct().collect())#输出为:[1, 2, 3, 5]

randomSplit运算

randomSplit 运算将整个集合以随机数的方式按照比例分为多个RDD

#比如按照0.4和0.6的比例将intRDD分为两个RDDsRDD = intRDD.randomSplit([0.4,0.6])print (len(sRDD))print (sRDD[0].collect())print (sRDD[1].collect())#输出为:2[3, 1][2, 5, 5]

groupBy和groupByKey

groupBy运算可以按照传入匿名函数的规则,将数据分为多个Array。

groupByKey将rdd的键值对按照Key放在一起,后面一般接一个function函数

#下面的代码将intRDD分为偶数和奇数:result = intRDD.groupBy(lambda x : x % 2).collect()#这边y是一个迭代器,用sorted或者list来迭代结果print (sorted([(x, sorted(y)) for (x, y) in result]))#输出为:[(0, [2]), (1, [1, 3, 5, 5])]##############################groupByKey############################rdd=sc.parallelize([('a',1),('b',100),('a',200)])a=sorted(rdd.groupByKey().mapValues(len).collect())print(a)a=sorted(rdd.groupByKey().mapValues(list).collect())print(a)#输出[('a',2),('b',1)][('a',[1,200],('b',[100]))]

map和flatMap

map()是将文件每一行进行操作,数量不会改变

flatMap()是将所有元素进行操作,数量只会大于或者等于初始数量

from pyspark import SparkConf, SparkContextsc = SparkContext("local[*]", "First App")RDD = sc.parallelize(['i am a student','i am a teacher'])print(RDD.collect())print(RDD.count())rdd1 = RDD.map(lambda x:x.split(' '))print(rdd1.collect())print(rdd1.count())rdd2 = RDD.flatMap(lambda x:x.split(' '))print(rdd2.collect())print(rdd2.count())#输出['i am a student', 'i am a teacher']2[['i', 'am', 'a', 'student'], ['i', 'am', 'a', 'teacher']]2['i', 'am', 'a', 'student', 'i', 'am', 'a', 'teacher']8

reduce与reduceByKey

reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。reduceByKey就是对元素为键值对的RDD中Key相同的元素的Value进行reduce操作,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的键值对。(去键重)

from pyspark import SparkConf, SparkContextsc = SparkContext("local[*]", "First App")##################reduce##############################RDD = sc.parallelize([1,2,3,4])print(RDD.collect())rdd1 = RDD.reduce(lambda x,y:x*y)print(type(rdd1))print(rdd1)#输出[1, 2, 3, 4]<class 'int'>24##################reduceByKey##############################RDD = sc.parallelize(['i am a student','i am a teacher'])rdd1 = RDD.flatMap(lambda x:x.split(' ')).map(lambda x:(x,1))print(rdd1.collect())rdd2 = rdd1.reduceByKey(lambda x,y:x+y)print(rdd2.collect())#输出[('i', 1), ('am', 1), ('a', 1), ('student', 1), ('i', 1), ('am', 1), ('a', 1), ('teacher', 1)][('i', 2), ('a', 2), ('teacher', 1), ('am', 2), ('student', 1)]

zip

zip() 合并俩个列表,并按索引对应值

keylist = [1,2,3,4]valuelist = ['a','b','c','d']pair = zip(keylist,valuelist)rdd = sc.parallelize(pair)print(rdd.collect())print(rdd.keys().collect())print(rdd.values().collect())#输出[(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')][1, 2, 3, 4]['a', 'b', 'c', 'd']

rdd转dataframe

from pyspark.sql.session import SparkSessionfrom pyspark.sql.types import StructField, StructType, StringTypespark = SparkSession.builder.master("local").appName("haha").getOrCreate()sc = spark.sparkContextdata = [('Alex','male',3),('Nancy','female',6),['Jack','male',9]] # mixedrdd_ = sc.parallelize(data)'''schema'string': StringType()'date': TimestampType()'double': DoubleType()'int': IntegerType()'none': NullType()'''schema = StructType([# true代表不为空StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("num", StringType(), True)])df = spark.createDataFrame(rdd_, schema=schema)

3、多个RDD转换运算

RDD也支持执行多个RDD的运算,这里,我们定义三个RDD:

intRDD1 = sc.parallelize([3,1,2,5,5])intRDD2 = sc.parallelize([5,6])intRDD3 = sc.parallelize([2,7])

并集运算

#可以使用union函数进行并集运算:print (intRDD1.union(intRDD2).union(intRDD3).collect())#输出为:[3, 1, 2, 5, 5, 5, 6, 2, 7]

交集运算

#可以使用intersection进行交集运算print (intRDD1.intersection(intRDD2).collect())#输出为:[5]

差集运算

#可以使用subtract函数进行差集运算:print (intRDD1.subtract(intRDD2).collect())#由于两个RDD的重复部分为5,所以输出为:[2, 1, 3]

笛卡尔积运算

#可以使用cartesian函数进行笛卡尔乘积运算:print (intRDD1.cartesian(intRDD2).collect())#由于两个RDD分别有5个元素和2个元素,所以返回结果有10各元素:[(3, 5), (3, 6), (1, 5), (1, 6), (2, 5), (2, 6), (5, 5), (5, 6), (5, 5), (5, 6)]

4、基本“动作”运算

读取元素

#可以使用下列命令读取RDD内的元素,这是Actions运算,所以会马上执行:print (intRDD.first()) #取第一条数据print (intRDD.take(2)) #取前两条数据print (intRDD.takeOrdered(3)) #升序排列,并取前3条数据print (intRDD.takeOrdered(3,lambda x:-x)) #降序排列,并取前3条数据#输出为:3[3, 1][1, 2, 3][5, 5, 3]

统计功能

#可以将RDD内的元素进行统计运算:print (intRDD.stats())#统计print (intRDD.min())#最小值print (intRDD.max())#最大值print (intRDD.stdev())#标准差print (intRDD.count())#计数print (intRDD.sum())#求和print (intRDD.mean())#平均

5、RDD Key-Value基本“转换”运算

Spark RDD支持键值对运算,Key-Value运算时mapreduce运算的基础,本节介绍RDD键值的基本“转换”运算。

初始化

我们用元素类型为tuple元组的数组初始化我们的RDD,这里,每个tuple的第一个值将作为键,而第二个元素将作为值。

kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])

得到key和value值

#可以使用keys和values函数分别得到RDD的键数组和值数组:print (kvRDD1.keys().collect())print (kvRDD1.values().collect())#输出为:[3, 3, 5, 1][4, 6, 6, 2]

筛选元素

可以按照键进行元素筛选,也可以通过值进行元素筛选,和之前的一样,使用filter函数,这里要注意的是:虽然RDD中是以键值对形式存在,但是本质上还是一个二元组,二元组的第一个值代表键,第二个值代表值。

#1:按照键进行筛选,我们筛选键值小于5的数据:print (kvRDD1.filter(lambda x:x[0] < 5).collect())#输出为:[(3, 4), (3, 6), (1, 2)]#2:将x[0]替换为x[1]就是按照值进行筛选,我们筛选值小于5的数据:print (kvRDD1.filter(lambda x:x[1] < 5).collect())#输出为:[(3, 4), (1, 2)]

值运算

我们可以使用mapValues方法处理value值

#下面的代码将value值进行了平方处理:print (kvRDD1.mapValues(lambda x:x**2).collect())#输出为:[(3, 16), (3, 36), (5, 36), (1, 4)]

按照key排序

可以使用sortByKey按照key进行排序,传入参数的默认值为true,是按照从小到大排序,也可以传入参数false,表示从大到小排序:

print (kvRDD1.sortByKey().collect())print (kvRDD1.sortByKey(True).collect())print (kvRDD1.sortByKey(False).collect())#输出为:[(1, 2), (3, 4), (3, 6), (5, 6)][(1, 2), (3, 4), (3, 6), (5, 6)][(5, 6), (3, 4), (3, 6), (1, 2)]

合并相同key值的数据

使用reduceByKey函数可以对具有相同key值的数据进行合并。

#RDD中存在(3,4)和(3,6)两条key值均为3的数据,他们将被合为一条数据:print (kvRDD1.reduceByKey(lambda x,y:x+y).collect())#输出为:[(1, 2), (3, 10), (5, 6)]

6、多个RDD Key-Value“转换”运算

初始化

#首先我们初始化两个k-v的RDD:kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])kvRDD2 = sc.parallelize([(3,8)])

内连接运算

join运算可以实现类似数据库的内连接,将两个RDD按照相同的key值join起来

#kvRDD1与kvRDD2的key值唯一相同的是3,kvRDD1中有两条key值为3的数据(3,4)和(3,6),而kvRDD2中只有一条key值为3的数据(3,8),所以join的结果是(3,(4,8)) 和(3,(6,8)):print (kvRDD1.join(kvRDD2).collect())#输出为:[(3, (4, 8)), (3, (6, 8))]

左外连接

使用leftOuterJoin可以实现类似数据库的左外连接,如果kvRDD1的key值对应不到kvRDD2,就会显示None

print (kvRDD1.leftOuterJoin(kvRDD2).collect())#输出为:[(1, (2, None)), (3, (4, 8)), (3, (6, 8)), (5, (6, None))]

右外连接

使用rightOuterJoin可以实现类似数据库的右外连接,如果kvRDD2的key值对应不到kvRDD1,就会显示None

print (kvRDD1.rightOuterJoin(kvRDD2).collect())#输出为:[(3, (4, 8)), (3, (6, 8))]

删除相同key值数据

使用subtractByKey运算会删除相同key值得数据:

print (kvRDD1.subtractByKey(kvRDD2).collect())#结果为:[(1, 2), (5, 6)]

7、Key-Value“动作”运算

读取数据

可以使用下面的几种方式读取RDD的数据:

print (kvRDD1.first())#读取第一条数据print (kvRDD1.take(2))#读取前两条数据print (kvRDD1.first()[0])#读取第一条数据的key值print (kvRDD1.first()[1])#读取第一条数据的value值#输出为:(3, 4)[(3, 4), (3, 6)]34

按key值统计:

使用countByKey函数可以统计各个key值对应的数据的条数:

print (kvRDD1.countByKey().collect())#输出为:defaultdict(<type 'int'>, {1: 1, 3: 2, 5: 1})

lookup查找运算

使用lookup函数可以根据输入的key值来查找对应的Value值:

print (kvRDD1.lookup(3))#输出为:[4, 6]

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。
相关阅读
PySpark RDD操作

PySpark RDD操作

2019-04-15

PySpark | RDD

PySpark | RDD

2021-01-23

pyspark rdd 基本操作

pyspark rdd 基本操作

2021-05-24

pyspark  DataFrame 转RDD

pyspark DataFrame 转RDD

2022-08-12