600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > RDD转换为DataFrame

RDD转换为DataFrame

时间:2022-11-12 00:44:48

相关推荐

RDD转换为DataFrame

spark官方提供了两种方法实现从RDD转换到DataFrame。第一种方法是利用反射机制来推断包含特定类型对象的Schema,这种方式适用于对已知的数据结构的RDD转换;第二种方法通过编程接口构造一个 Schema ,并将其应用在已知的RDD数据中。

(一)反射机制推断Schema

在Windows系统下开发Scala 代码,可以使用本地环境测试,因此首先需要在本地磁

盘准备文本数据文件,这里将HD FS中的/spark/person.txt文件下载到本地D:/spark person.txt路径下。从文件4-1可!以看出,当前数据文件共3列,可以非常容易地分析出这3列分别是编号、姓名、年龄。但是计算机无法像人一样直观地感受字段的实际含义,因此需要通过反射机制来推断包含特定类型对象的Schema信息。

接下来打开IDEA开发工具,创建名为 spark01 的Maven工程,讲解实现反射机制推断Schema的开发工具。

1、添加 Spark SQL 依赖,代码如下:

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

2、编写代码:

文件名:CaseClassSchema.scala

package cn.itcastimport org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.{DataFrame, Row,SparkSession}//定义样例类case class Person(id:Int,name:String,age:Int)object CaseClassSchema {def main(args: Array[String]): Unit = {//构建SparkSessionval spark : SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()//获取SparkContextval sc : SparkContext = spark.sparkContext//设置日志打印级别sc.setLogLevel("WARN")//读取文件val data:RDD[Array[String]]=sc.textFile("D://spark//person.txt").map(x=>x.split(" "))//将RDD与样例类关联val personRdd : RDD[Person] = data.map(x=>Person(x(0).toInt,x(1),x(2).toInt))//获取DataFrame//手动导入隐式转换import spark.implicits._val personDF : DataFrame = personRdd.toDF//------------DSL风格操作开始----------// 显示DataFrame的数据,默认显示20行personDF.show()//显示DataFrame的schema信息personDF.printSchema()//统计DataFrame中年龄大于30岁的人println(personDF.filter($"age">30).count())//-----------------DSL风格操作结束------------//----------------SQL风格操作开始-------------//将DataFrame注册成表personDF.createOrReplaceTempView("t_person")spark.sql("select * from t_person").show()spark.sql("select * from t_person where name='kuli'").show()//---------------------SQL风格操作结束--------------------//关闭资源操作sc.stop()spark.stop()}}

运行结果:

(二)编程方式定义Schema

当case类不能提前定义的时候,就需要采用编程方式定义Schema信息,定义DataFrame主要包含3个步骤,具体如下:

(1)创建一个Row对象结构的RDD;(2)基于StructType类型创建Schema;

(3)通过SparkSession提供的createDataFrame(()方法来拼接Schema。

根据上述步骤,创建 SparkSqlSchema.scala文件,使用编程方式定义Schema信息的具体代码如文件所示。

文件名:SparkSqlSchema.scala

package cn.itcastimport org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.{DataFrame, Row, SparkSession}import org.apache.spark.sql.types.{StructType, StructField, StringType,IntegerType}object SparkSqlSchema {def main(args: Array[String]): Unit = {// 创建SparkSeeionval spark : SparkSession = SparkSession.builder().appName("SparkSqlSchema").master("local[2]").getOrCreate()// 获取sparkContext对象val sc : SparkContext = spark.sparkContext//设置日志打印级别sc.setLogLevel("WARN")//加载数据val dataRDD : RDD[String] = sc.textFile("D://spark//person.txt")// 切分每一行val dataArrayRDD : RDD[Array[String]] = dataRDD.map(_.split(" "))//加载数据到Row对象中val personRDD : RDD[Row] = dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))//创建Schemaval schema : StructType = StructType(Seq(StructField("id",IntegerType,false),StructField("name",StringType,false),StructField("age",IntegerType,false)))//利用personRDD与Schema创建DataFrameval personDF : DataFrame = spark.createDataFrame(personRDD,schema)//DSL操作显示DataFrame的数据结果personDF.show()//将DataFrame注册成表personDF.createOrReplaceTempView("t_person")//sql语句操作spark.sql("select * from t_person").show()//关闭资源sc.stop()spark.stop()}}

运行结果:

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。