600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > Spark SQL之RDD转DataFrame

Spark SQL之RDD转DataFrame

时间:2021-12-27 15:37:36

相关推荐

Spark SQL之RDD转DataFrame

准备文件

首先准备好测试文件info.txt,内容如下:

1,vincent,202,sarah,193,sofia,294,monica,26

将RDD转成DataFrame

方式一:反射

可以使用反射来推断包含了特定数据类型的RDD的元数据

代码如下:

package cn.ac.iie.sparkimport org.apache.spark.sql.SparkSession/*** DataFrame和RDD的互操作*/object DataFrameRDDApp {def main(args: Array[String]): Unit = {val sparkSessionApp = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()// 将RDD转成DataFrameval rdd = sparkSessionApp.sparkContext.textFile("file:///E:/test/infos.txt")// 注意需要导入隐式转换import sparkSessionApp.implicits._val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()infoDF.show()sparkSessionApp.close()}case class Info(id:Int, name:String, age:Int){}}

当得到DataFrame之后就可以进行其他的相应操作了,例如进行过滤:infoDF.filter(infoDF.col("age") > 25).show():输出如下:

随后可以将DataFrame转成一张表。

我们可以通过infoDF.createOrReplaceTempView("infos")注册成一张表,好处就是可以直接通过SQL的方式进行处理。

infoDF.createOrReplaceTempView("infos")sparkSessionApp.sql("select * from infos where age > 25").show()

方式二:编程方式

当我们的Schema并不能提前定义时,就需要这种方式来实现了。这种方式必须要遵从如下三个步骤:

创建一个Rows的RDD定义一个Schema(使用StructType)使用createDataFrame将schema作用于Rows

代码试下如下:

package cn.ac.iie.sparkimport org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}import org.apache.spark.sql.{Row, SparkSession}/*** DataFrame和RDD的互操作*/object DataFrameRDDApp {def main(args: Array[String]): Unit = {val sparkSessionApp = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()// infoReflection(sparkSessionApp)program(sparkSessionApp)sparkSessionApp.close()}private def program(sparkSessionApp: SparkSession) = {val rdd = sparkSessionApp.sparkContext.textFile("file:///E:/test/infos.txt")val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))val structType = StructType(Array(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("age", IntegerType, true)))val infoDF = sparkSessionApp.createDataFrame(infoRDD, structType)infoDF.printSchema()infoDF.show()}private def infoReflection(sparkSessionApp: SparkSession) = {// 将RDD转成DataFrameval rdd = sparkSessionApp.sparkContext.textFile("file:///E:/test/infos.txt")// 注意需要导入隐式转换import sparkSessionApp.implicits._val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()infoDF.show()infoDF.filter(infoDF.col("age") > 25).show()infoDF.createOrReplaceTempView("infos")sparkSessionApp.sql("select * from infos where age > 25").show()}case class Info(id:Int, name:String, age:Int){}}

这种方式拿到DataFrame之后,依然可以进行其他的相关API操作。

两种方式的优缺点

DataFrame和RDD互操作的两种方式:

反射:case class。

这种方式事先需要知道你的字段、字段类型

编程方式:Row

如果第一种情况不能满足要求,无法事先知道字段与类型

优先考虑第一种方式。因为实现较为简单。

总结:DataFrame = RDD + Schema

RDD仅仅知道里面装的是什么对象(user),但是无法知道这个user里有哪些属性,以及属性的字段是什么类型的。所以我们直接处理RDD是有一定的困难,因此需要自己执行Schema表结构,将Schema作用于RDD中,就可以看做是一个表了。接下来就可以方便的进行操作了。

同时DataFrame优势:DataFrame底层使用了Catalyst进行优化。

DataFrame还支持text、json、parquet以及其他外部数据源格式。将外部数据源的数据注册到sparksql中,成为DataFrame,然后就可以使用DataFrame自身提供的API进行操作了。或者可以注册成一张表执行sql语句。执行自己的API或sql,最终形成的逻辑执行计划都是一样的。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。