一 例子说明
用spark的RDD与DataFrame两种方式实现如下功能
1.合并主特征与单特征
2.对标签进行过滤
3.标签与特征进行合并
4.输出指定格式最后的数据
二 数据说明
包括三个文件:
三 使用RDD方式进行操作
1.
#!/usr/bin/env python# -*- coding: utf-8 -*-from pyspark.sql import SparkSessionimport sysimport loggingspark = SparkSession.builder.enableHiveSupport().getOrCreate()sc = spark.sparkContext
2
org_inst_file = "./inst.txt"label_input_file = "./driver.txt"subscore_file = "./feature.txt"
3
def read_inst(line):cmid, inst_str = line.strip().split("|")return (cmid, inst_str)org_inst = sc.textFile(org_inst_file).map(read_inst) # (id, inst_str)print(org_inst.collect())
4
def read_label(line):contents = line.strip().split("|")cmid = contents[0]label = contents[1]return (cmid, label)def filter_label(line):contents = line.strip().split("|")condition1 = contents[-1]condition2 = contents[-2]return condition1 == "5" and condition2 == "0"label = sc.textFile(label_input_file).filter(filter_label).map(lambda line: read_label(line)) # (cmid, suffix_str)print(label.collect())
5
def read_subscore(line):cmid, score = line.strip().split("|")return (cmid, score)subscore = sc.textFile(subscore_file).map(read_subscore) # (id, subscore)print(subscore.collect())
6
subscore_index = "4"def merge_subscore(values):# (cmid,(inst_str,subscore))inst_str = values[0]subscore = values[1]if subscore is None:return inst_strelse:return " ".join([inst_str, "{}:{}".format(subscore_index, subscore)])new_inst = org_inst.leftOuterJoin(subscore).mapValues(merge_subscore) #print(new_inst.collect())
7
def merge_inst_label(data):cmid = data[0]inst_str = data[1][0]label_str = data[1][1]out = label_str + "\t" + inst_str + " #{}".format(cmid)return outinst_with_label = new_inst.join(label).map(merge_inst_label)print(inst_with_label.collect())
8
inst_with_label.saveAsTextFile("./output_rdd")
四 使用DataFrame方式进行操作
1.
#!/usr/bin/env python# -*- coding: utf-8 -*-from pyspark.sql import SparkSessionimport sysimport loggingspark = SparkSession.builder.enableHiveSupport().getOrCreate()
2
org_inst_file = "./inst.txt"label_input_file = "./driver.txt"subscore_file = "./feature.txt"
3
df_inst = spark.read.format('csv')\.option('delimiter', '|')\.load(org_inst_file)\.toDF('id', 'index_with_feature')df_inst.show()df_inst.printSchema()
4
df_subscore = spark.read.format('csv')\.option('delimiter', '|')\.load(subscore_file)\.toDF('id', 'feature')df_subscore.show()df_subscore.printSchema()
5
df_merge_feature = df_inst.join(df_subscore, on="id", how="left")df_merge_feature.show()
6
df_label = spark.read.format('csv')\.option('delimiter', '|')\.load(label_input_file)\.toDF('id', 'label', "condition1", "condition2")df_label.show()df_label = df_label.filter((df_label['condition1'] == 0) & (df_label['condition2'] == 5))df_label.show()
7
df_merge = df_merge_feature.join(df_label, on="id", how="inner")df_merge.show()
8
from pyspark.sql.types import *from pyspark.sql.functions import udfsubscore_index = "4"def fc2(a, b):return "{} {}:{}".format(a, subscore_index, b)fc2 = udf(fc2, StringType())df_merge = df_merge.withColumn('inst_feature', fc2("index_with_feature",'feature'))df_merge.show()df_merge2 = df_merge[["id", "inst_feature", "label"]]df_merge2.show()
9
# 写到csvfile="./output_dataframe"df_merge2.write.csv(path=file, header=False, sep="\t", mode='overwrite')df_merge2.rdd.map(lambda x : str(x[2]) + "\t" + x[1] + " #" +x[0]).saveAsTextFile('./output_dataframe2')