600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > Spark SQL程序实现RDD转换DataFrame

Spark SQL程序实现RDD转换DataFrame

时间:2022-08-22 04:12:55

相关推荐

Spark SQL程序实现RDD转换DataFrame

通过反射推断Schema

在Spark SQL中有两种方式可以在DataFrame和RDD进行转换

利用反射机制,推导包含某种类型的RDD,通过反射将其转换为指定类型的DataFrame,适用于提前知道RDD的schema。通过编程接口与RDD进行交互获取schema,并动态创建DataFrame,在运行时决定列及其类型。

1、创建maven工程添加依赖

<properties><scala.version>2.11.8</scala.version><hadoop.version>2.7.4</hadoop.version><spark.version>2.0.2</spark.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.0.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.0.2</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass></mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>

2、代码实现

Scala支持使用case class类型导入RDD转换为DataFrame,通过case class创建schema,case class的参数名称会被反射读取并成为表的列名。这种RDD可以高效的转换为DataFrame并注册为表。

package cn.cheng.sqlimport org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.{DataFrame, SparkSession}/*** RDD转化成DataFrame:利用反射机制*///todo:定义一个样例类Personcase class Person(id:Int,name:String,age:Int) extends Serializableobject InferringSchema {def main(args: Array[String]): Unit = {//todo:1、构建sparkSession 指定appName和master的地址val spark: SparkSession = SparkSession.builder().appName("InferringSchema").master("local[2]").getOrCreate()//todo:2、从sparkSession获取sparkContext对象val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//设置日志输出级别//todo:3、加载数据val dataRDD: RDD[String] = sc.textFile("D:\\person.txt")//todo:4、切分每一行记录val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))//todo:5、将RDD与Person类关联val personRDD: RDD[Person] = lineArrayRDD.map(x=>Person(x(0).toInt,x(1),x(2).toInt))//todo:6、创建dataFrame,需要导入隐式转换import spark.implicits._val personDF: DataFrame = personRDD.toDF()//todo-------------------DSL语法操作 start--------------//1、显示DataFrame的数据,默认显示20行personDF.show()//2、显示DataFrame的schema信息personDF.printSchema()//3、显示DataFrame记录数println(personDF.count())//4、显示DataFrame的所有字段personDF.columns.foreach(println)//5、取出DataFrame的第一行记录println(personDF.head())//6、显示DataFrame中name字段的所有值personDF.select("name").show()//7、过滤出DataFrame中年龄大于30的记录personDF.filter($"age" > 30).show()//8、统计DataFrame中年龄大于30的人数println(personDF.filter($"age">30).count())//9、统计DataFrame中按照年龄进行分组,求每个组的人数personDF.groupBy("age").count().show()//todo-------------------DSL语法操作 end-------------//todo--------------------SQL操作风格 start-----------//todo:将DataFrame注册成表personDF.createOrReplaceTempView("t_person")//todo:传入sql语句,进行操作spark.sql("select * from t_person").show()spark.sql("select * from t_person where name='zhangsan'").show()spark.sql("select * from t_person order by age desc").show()//todo--------------------SQL操作风格 end-------------sc.stop()}}

通过StructType直接指定Schema

1、当case class不能提前定义时,可以通过以下三步创建DataFrame

1、将RDD转为包含row对象的RDD1、基于structType类型创建schema,与第一步创建的RDD相匹配2、通过sparkSession的createDataFrame方法对第一步的RDD应用

schema创建DataFrame

2、代码实现

maven依赖同Spark SQL程序实现RDD转换DataFrame(一)

package cn.cheng.sqlimport org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}import org.apache.spark.sql.{DataFrame, Row, SparkSession}/*** RDD转换成DataFrame:通过指定schema构建DataFrame*/object SparkSqlSchema {def main(args: Array[String]): Unit = {//todo:1、创建SparkSession,指定appName和masterval spark: SparkSession = SparkSession.builder().appName("SparkSqlSchema").master("local[2]").getOrCreate()//todo:2、获取sparkContext对象val sc: SparkContext = spark.sparkContext//todo:3、加载数据val dataRDD: RDD[String] = sc.textFile("d:\\person.txt")//todo:4、切分每一行val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))//todo:5、加载数据到Row对象中val personRDD: RDD[Row] = dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))//todo:6、创建schemaval schema:StructType= StructType(Seq(StructField("id", IntegerType, false),StructField("name", StringType, false),StructField("age", IntegerType, false)))//todo:7、利用personRDD与schema创建DataFrameval personDF: DataFrame = spark.createDataFrame(personRDD,schema)//todo:8、DSL操作显示DataFrame的数据结果personDF.show()//todo:9、将DataFrame注册成表personDF.createOrReplaceTempView("t_person")//todo:10、sql语句操作spark.sql("select * from t_person").show()spark.sql("select count(*) from t_person").show()sc.stop()}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。