600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > [Spark]PySpark入门学习教程---例子RDD与DataFrame

[Spark]PySpark入门学习教程---例子RDD与DataFrame

时间:2021-04-02 12:29:52

相关推荐

[Spark]PySpark入门学习教程---例子RDD与DataFrame

一 例子说明

用spark的RDD与DataFrame两种方式实现如下功能

1.合并主特征与单特征

2.对标签进行过滤

3.标签与特征进行合并

4.输出指定格式最后的数据

二 数据说明

包括三个文件:

三 使用RDD方式进行操作

1.

#!/usr/bin/env python# -*- coding: utf-8 -*-from pyspark.sql import SparkSessionimport sysimport loggingspark = SparkSession.builder.enableHiveSupport().getOrCreate()sc = spark.sparkContext

2

org_inst_file = "./inst.txt"label_input_file = "./driver.txt"subscore_file = "./feature.txt"

3

def read_inst(line):cmid, inst_str = line.strip().split("|")return (cmid, inst_str)org_inst = sc.textFile(org_inst_file).map(read_inst) # (id, inst_str)print(org_inst.collect())

4

def read_label(line):contents = line.strip().split("|")cmid = contents[0]label = contents[1]return (cmid, label)def filter_label(line):contents = line.strip().split("|")condition1 = contents[-1]condition2 = contents[-2]return condition1 == "5" and condition2 == "0"label = sc.textFile(label_input_file).filter(filter_label).map(lambda line: read_label(line)) # (cmid, suffix_str)print(label.collect())

5

def read_subscore(line):cmid, score = line.strip().split("|")return (cmid, score)subscore = sc.textFile(subscore_file).map(read_subscore) # (id, subscore)print(subscore.collect())

6

subscore_index = "4"def merge_subscore(values):# (cmid,(inst_str,subscore))inst_str = values[0]subscore = values[1]if subscore is None:return inst_strelse:return " ".join([inst_str, "{}:{}".format(subscore_index, subscore)])new_inst = org_inst.leftOuterJoin(subscore).mapValues(merge_subscore) #print(new_inst.collect())

7

def merge_inst_label(data):cmid = data[0]inst_str = data[1][0]label_str = data[1][1]out = label_str + "\t" + inst_str + " #{}".format(cmid)return outinst_with_label = new_inst.join(label).map(merge_inst_label)print(inst_with_label.collect())

8

inst_with_label.saveAsTextFile("./output_rdd")

四 使用DataFrame方式进行操作

1.

#!/usr/bin/env python# -*- coding: utf-8 -*-from pyspark.sql import SparkSessionimport sysimport loggingspark = SparkSession.builder.enableHiveSupport().getOrCreate()

2

org_inst_file = "./inst.txt"label_input_file = "./driver.txt"subscore_file = "./feature.txt"

3

df_inst = spark.read.format('csv')\.option('delimiter', '|')\.load(org_inst_file)\.toDF('id', 'index_with_feature')df_inst.show()df_inst.printSchema()

4

df_subscore = spark.read.format('csv')\.option('delimiter', '|')\.load(subscore_file)\.toDF('id', 'feature')df_subscore.show()df_subscore.printSchema()

5

df_merge_feature = df_inst.join(df_subscore, on="id", how="left")df_merge_feature.show()

6

df_label = spark.read.format('csv')\.option('delimiter', '|')\.load(label_input_file)\.toDF('id', 'label', "condition1", "condition2")df_label.show()df_label = df_label.filter((df_label['condition1'] == 0) & (df_label['condition2'] == 5))df_label.show()

7

df_merge = df_merge_feature.join(df_label, on="id", how="inner")df_merge.show()

8

from pyspark.sql.types import *from pyspark.sql.functions import udfsubscore_index = "4"def fc2(a, b):return "{} {}:{}".format(a, subscore_index, b)fc2 = udf(fc2, StringType())df_merge = df_merge.withColumn('inst_feature', fc2("index_with_feature",'feature'))df_merge.show()df_merge2 = df_merge[["id", "inst_feature", "label"]]df_merge2.show()

9

# 写到csvfile="./output_dataframe"df_merge2.write.csv(path=file, header=False, sep="\t", mode='overwrite')df_merge2.rdd.map(lambda x : str(x[2]) + "\t" + x[1] + " #" +x[0]).saveAsTextFile('./output_dataframe2')

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。