600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > PySpark-Recipes : RDD对象的基本操作

PySpark-Recipes : RDD对象的基本操作

时间:2018-12-10 11:35:15

相关推荐

PySpark-Recipes : RDD对象的基本操作

RDD对象的基本操作

from pyspark.sql import SparkSessionspark = SparkSession.builder.appName('learn_RDD').getOrCreate()

# Create RDD of list.List = [2.3,3.4,4.3,2.4,2.3,4.0]rdd = spark.sparkContext.parallelize(List, 2) # numSlices--the number of partitions of the new RDDrdd.collect()

[2.3, 3.4, 4.3, 2.4, 2.3, 4.0]

# 获取第一个元素,前两个rdd.first(), rdd.take(2)

(2.3, [2.3, 3.4])

# 分区数rdd.getNumPartitions()

2

对RDD对象中元素的处理

temp = [59,57.2,53.6,55.4,51.8,53.6,55.4]rdd = spark.sparkContext.parallelize(temp,2)rdd.collect()

[59, 57.2, 53.6, 55.4, 51.8, 53.6, 55.4]

# 转换def func(x) :y = x*x/(x*2)return ynew_rdd = rdd.map(func)new_rdd.collect()

[29.5, 28.6, 26.8, 27.7, 25.9, 26.8, 27.7]

new_rdd.filter(lambda x: x>=27).collect()

[29.5, 28.6, 27.7, 27.7]

基本的数据处理

# 建表studentData = [["id_1","year1",62.08,62.4],["id_1","year2",75.94,76.75],["id_2","year1",68.26,72.95],["id_2","year2",85.49,75.8],["id_3","year1",75.08,79.84],["id_3","year2",54.98,87.72],["id_4","year1",50.03,66.85],["id_4","year2",71.26,69.77],["id_5","year1",52.74,76.27],["id_5","year2",50.39,68.58],["id_6","year1",74.86,60.8],["id_6","year2",58.29,62.38],["id_7","year1",63.95,74.51],["id_7","year2",66.69,56.92]]studentRDD = spark.sparkContext.parallelize(studentData,4)studentRDD.take(2)

[['id_1', 'year1', 62.08, 62.4], ['id_1', 'year2', 75.94, 76.75]]

# 计算特定列的均值temp_mean = studentRDD.map(lambda x: [x[0], x[1], (x[2]+x[3])/2])temp_mean.take(2)

[['id_1', 'year1', 62.239999999999995], ['id_1', 'year2', 76.345]]

# 过滤 year2temp_mean.filter(lambda x: 'year2' in x).take(2)

[['id_1', 'year2', 76.345], ['id_2', 'year2', 80.645]]

# 第二年平均成绩最高的,top 3temp_year2 = temp_mean.filter(lambda x: 'year2' in x)temp_year2.sortBy(keyfunc = lambda x: -x[2]).take(3)

[['id_2', 'year2', 80.645],['id_1', 'year2', 76.345],['id_3', 'year2', 71.35]]

temp_year2.takeOrdered(num=3, key=lambda x: -x[2])

[['id_2', 'year2', 80.645],['id_1', 'year2', 76.345],['id_3', 'year2', 71.35]]

RDD上的集合操作

list_1 = ['RIN1', 'RIN2', 'RIN3', 'RIN4', 'RIN5', 'RIN6', 'RIN7']list_2 = ['RIN3', 'RIN4', 'RIN7', 'RIN8', 'RIN9']list_3 = ['RIN4', 'RIN8', 'RIN10', 'RIN11', 'RIN12']# parallelizerdd_1 = spark.sparkContext.parallelize(list_1,2)rdd_2 = spark.sparkContext.parallelize(list_2,2)rdd_3 = spark.sparkContext.parallelize(list_3,2)# unionunion_12 = rdd_1.union(rdd_2)union_12.collect()

['RIN1','RIN2','RIN3','RIN4','RIN5','RIN6','RIN7','RIN3','RIN4','RIN7','RIN8','RIN9']

union_12.union(rdd_3).collect()

['RIN1','RIN2','RIN3','RIN4','RIN5','RIN6','RIN7','RIN3','RIN4','RIN7','RIN8','RIN9','RIN4','RIN8','RIN10','RIN11','RIN12']

# 去重union_123 = union_12.union(rdd_3)union_123.distinct().collect()

['RIN1','RIN10','RIN12','RIN2','RIN3','RIN5','RIN8','RIN4','RIN9','RIN11','RIN6','RIN7']

# rdd_1 - rdd_2rdd_1.subtract(rdd_2).collect()

['RIN1', 'RIN2', 'RIN5', 'RIN6']

# rdd_1 与 rdd_2的交集rdd_1.intersection(rdd_2).collect()

['RIN3', 'RIN4', 'RIN7']

计算一些统计值

temp = [12,13,15,12,11,12,11]rdd = spark.sparkContext.parallelize(temp,2)# countrdd.count()

7

# sumrdd.sum()

86

# meanrdd.mean()

12.285714285714286

# var /Nrdd.variance()

1.63265306122449

# Sample Variance /(N-1)rdd.sampleVariance()

1.904761904761905

# stdrdd.stdev()

1.2777531299998799

# Sample Standard Deviation /(N-1)rdd.sampleStdev()

1.3801311186847085

print(rdd.stats())print(rdd.stats().asDict())print(rdd.stats().mean())print(rdd.stats().stdev())print(rdd.stats().count())print(rdd.stats().min())print(rdd.stats().max())

(count: 7, mean: 12.285714285714286, stdev: 1.2777531299998799, max: 15.0, min: 11.0){'count': 7, 'mean': 12.285714285714286, 'sum': 86.0, 'min': 11.0, 'max': 15.0, 'stdev': 1.3801311186847085, 'variance': 1.904761904761905}12.2857142857142861.2777531299998799711.015.0

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。