RDD对象的基本操作
from pyspark.sql import SparkSessionspark = SparkSession.builder.appName('learn_RDD').getOrCreate()
# Create RDD of list.List = [2.3,3.4,4.3,2.4,2.3,4.0]rdd = spark.sparkContext.parallelize(List, 2) # numSlices--the number of partitions of the new RDDrdd.collect()
[2.3, 3.4, 4.3, 2.4, 2.3, 4.0]
# 获取第一个元素,前两个rdd.first(), rdd.take(2)
(2.3, [2.3, 3.4])
# 分区数rdd.getNumPartitions()
2
对RDD对象中元素的处理
temp = [59,57.2,53.6,55.4,51.8,53.6,55.4]rdd = spark.sparkContext.parallelize(temp,2)rdd.collect()
[59, 57.2, 53.6, 55.4, 51.8, 53.6, 55.4]
# 转换def func(x) :y = x*x/(x*2)return ynew_rdd = rdd.map(func)new_rdd.collect()
[29.5, 28.6, 26.8, 27.7, 25.9, 26.8, 27.7]
new_rdd.filter(lambda x: x>=27).collect()
[29.5, 28.6, 27.7, 27.7]
基本的数据处理
# 建表studentData = [["id_1","year1",62.08,62.4],["id_1","year2",75.94,76.75],["id_2","year1",68.26,72.95],["id_2","year2",85.49,75.8],["id_3","year1",75.08,79.84],["id_3","year2",54.98,87.72],["id_4","year1",50.03,66.85],["id_4","year2",71.26,69.77],["id_5","year1",52.74,76.27],["id_5","year2",50.39,68.58],["id_6","year1",74.86,60.8],["id_6","year2",58.29,62.38],["id_7","year1",63.95,74.51],["id_7","year2",66.69,56.92]]studentRDD = spark.sparkContext.parallelize(studentData,4)studentRDD.take(2)
[['id_1', 'year1', 62.08, 62.4], ['id_1', 'year2', 75.94, 76.75]]
# 计算特定列的均值temp_mean = studentRDD.map(lambda x: [x[0], x[1], (x[2]+x[3])/2])temp_mean.take(2)
[['id_1', 'year1', 62.239999999999995], ['id_1', 'year2', 76.345]]
# 过滤 year2temp_mean.filter(lambda x: 'year2' in x).take(2)
[['id_1', 'year2', 76.345], ['id_2', 'year2', 80.645]]
# 第二年平均成绩最高的,top 3temp_year2 = temp_mean.filter(lambda x: 'year2' in x)temp_year2.sortBy(keyfunc = lambda x: -x[2]).take(3)
[['id_2', 'year2', 80.645],['id_1', 'year2', 76.345],['id_3', 'year2', 71.35]]
temp_year2.takeOrdered(num=3, key=lambda x: -x[2])
[['id_2', 'year2', 80.645],['id_1', 'year2', 76.345],['id_3', 'year2', 71.35]]
RDD上的集合操作
list_1 = ['RIN1', 'RIN2', 'RIN3', 'RIN4', 'RIN5', 'RIN6', 'RIN7']list_2 = ['RIN3', 'RIN4', 'RIN7', 'RIN8', 'RIN9']list_3 = ['RIN4', 'RIN8', 'RIN10', 'RIN11', 'RIN12']# parallelizerdd_1 = spark.sparkContext.parallelize(list_1,2)rdd_2 = spark.sparkContext.parallelize(list_2,2)rdd_3 = spark.sparkContext.parallelize(list_3,2)# unionunion_12 = rdd_1.union(rdd_2)union_12.collect()
['RIN1','RIN2','RIN3','RIN4','RIN5','RIN6','RIN7','RIN3','RIN4','RIN7','RIN8','RIN9']
union_12.union(rdd_3).collect()
['RIN1','RIN2','RIN3','RIN4','RIN5','RIN6','RIN7','RIN3','RIN4','RIN7','RIN8','RIN9','RIN4','RIN8','RIN10','RIN11','RIN12']
# 去重union_123 = union_12.union(rdd_3)union_123.distinct().collect()
['RIN1','RIN10','RIN12','RIN2','RIN3','RIN5','RIN8','RIN4','RIN9','RIN11','RIN6','RIN7']
# rdd_1 - rdd_2rdd_1.subtract(rdd_2).collect()
['RIN1', 'RIN2', 'RIN5', 'RIN6']
# rdd_1 与 rdd_2的交集rdd_1.intersection(rdd_2).collect()
['RIN3', 'RIN4', 'RIN7']
计算一些统计值
temp = [12,13,15,12,11,12,11]rdd = spark.sparkContext.parallelize(temp,2)# countrdd.count()
7
# sumrdd.sum()
86
# meanrdd.mean()
12.285714285714286
# var /Nrdd.variance()
1.63265306122449
# Sample Variance /(N-1)rdd.sampleVariance()
1.904761904761905
# stdrdd.stdev()
1.2777531299998799
# Sample Standard Deviation /(N-1)rdd.sampleStdev()
1.3801311186847085
print(rdd.stats())print(rdd.stats().asDict())print(rdd.stats().mean())print(rdd.stats().stdev())print(rdd.stats().count())print(rdd.stats().min())print(rdd.stats().max())
(count: 7, mean: 12.285714285714286, stdev: 1.2777531299998799, max: 15.0, min: 11.0){'count': 7, 'mean': 12.285714285714286, 'sum': 86.0, 'min': 11.0, 'max': 15.0, 'stdev': 1.3801311186847085, 'variance': 1.904761904761905}12.2857142857142861.2777531299998799711.015.0