600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > Spark案例实战教程

Spark案例实战教程

时间:2019-05-06 22:33:16

相关推荐

Spark案例实战教程

Spark案例实战

实战代码参考:GitHub - GoAlers/Bigdata_project: 电商大数据项目-推荐系统(java和scala语言)

搭建项目

pom参考

<dependencies><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><!-- Spark-core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>1.6.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.10</artifactId><version>1.6.0</version></dependency><!-- SparkSQL --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>1.6.0</version></dependency><!-- SparkSQL ON Hive--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.10</artifactId><version>1.6.0</version></dependency><!--SparkStreaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>1.6.0</version><!--<scope>provided</scope>--></dependency><!-- SparkStreaming + Kafka --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.10</artifactId><version>1.6.0</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.10.5</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>2.10.5</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>2.10.5</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.6.5</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.5</version></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.10</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>1.2.2</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-it</artifactId><version>1.2.2</version></dependency><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.2.0</version></dependency><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.6.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>1.1.1</version></dependency><!--连接 Redis 需要的包--><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.6.1</version></dependency><!--mysql依赖的jar包--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><dependency><groupId>com.google.collections</groupId><artifactId>google-collections</artifactId><version>1.0</version></dependency></dependencies><repositories><repository><id>central</id><name>Maven Repository Switchboard</name><layout>default</layout><url>/maven2</url><snapshots><enabled>false</enabled></snapshots></repository></repositories><build><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/test/java</testSourceDirectory><plugins><!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 --><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><!-- MAVEN 编译使用的JDK版本 --><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.3</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.10</version><configuration><skip>true</skip></configuration></plugin></plugins></build>

1 demo1--WorldCount

项目目录下新建data文件夹,再新建world.csv文件

hello,sparkhello,scala,hadoophello,hdfshello,spark,hadoophello

scala版本---SparkWC.scala

import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/*** spark wordcount*/object SparkWC {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("wordcount").setMaster("local")val sc = new SparkContext(conf)sc.textFile("./data/world.csv").flatMap( _.split(",")).map((_,1)).reduceByKey(_+_).foreach(println)sc.stop()// 下面是每一步的分析// //conf 可以设置SparkApplication 的名称,设置Spark 运行的模式// val conf = new SparkConf()// conf.setAppName("wordcount")// conf.setMaster("local")// //SparkContext 是通往spark 集群的唯一通道// val sc = new SparkContext(conf)//// val lines: RDD[String] = sc.textFile("./data/world.csv")// val words: RDD[String] = lines.flatMap(line => {//line.split(",")// })// val pairWords: RDD[(String, Int)] = words.map(word=>{new Tuple2(word,1)})// val result: RDD[(String, Int)] = pairWords.reduceByKey((v1:Int, v2:Int)=>{v1+v2})// result.foreach(one=>{//println(one)// })}}

测试

java版本---SparkWC.scala

import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;public class SparkWordCount {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("wc");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> lines = sc.textFile("./data/world.csv");JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterable<String> call(String line) throws Exception {return Arrays.asList(line.split(","));}});JavaPairRDD<String, Integer> pairWords = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) throws Exception {return new Tuple2<>(s, 1);}});JavaPairRDD<String, Integer> result = pairWords.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});result.foreach(new VoidFunction<Tuple2<String, Integer>>() {@Overridepublic void call(Tuple2<String, Integer> tp) throws Exception {System.out.println(tp);}});sc.stop();}}

测试

2 demo2--join算子

代码及测试

import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ListBufferobject Taiko extends App {val conf = new SparkConf().setMaster("local").setAppName("wc");val sc = new SparkContext(conf)//demo1-5 data startval nameRDD: RDD[(String, Int)] = sc.parallelize(List[(String, Int)](("zhangsan", 18), ("lisi", 19), ("wangwu", 20), ("zhaoliu", 21)))val sourceRDD: RDD[(String, Int)] = sc.parallelize(List[(String, Int)](("zhangsan", 100), ("lisi", 200), ("wangwu", 300), ("tianqi", 400)))//demo1-5 data end//demo1 join//val result: RDD[(String, (Int, Int))] = nameRDD.join(sourceRDD)//result.foreach(println)/** demo1结果* (zhangsan,(18,100))* (wangwu,(20,300))* (lisi,(19,200))*///demo2 leftOuterJoin//val result: RDD[(String, (Int, Option[Int]))] = nameRDD.leftOuterJoin(sourceRDD)//result.foreach(println)/** demo2结果* (zhangsan,(18,Some(100)))* (wangwu,(20,Some(300)))* (zhaoliu,(21,None))* (lisi,(19,Some(200)))*//* result.foreach(res => {val name = res._1val v1 = res._2._1val v2 = res._2._2.getOrElse("没有分数")println(s"name=$name,age=$v1,scoure=$v2")})*//** demo2结果* name=zhangsan,age=18,scoure=100* name=wangwu,age=20,scoure=300* name=zhaoliu,age=21,scoure=没有分数* name=lisi,age=19,scoure=200*///demo3 rightOuterJoin//val result: RDD[(String, (Option[Int], Int))] = nameRDD.rightOuterJoin(sourceRDD)//result.foreach(println)/** demo3结果* (zhangsan,(Some(18),100))* (wangwu,(Some(20),300))* (tianqi,(None,400))* (lisi,(Some(19),200))*///demo4 fullOuterJoin//val result: RDD[(String, (Option[Int], Option[Int]))] = nameRDD.fullOuterJoin(sourceRDD)//result.foreach(println)/** demo4结果* (zhangsan,(Some(18),Some(100)))* (wangwu,(Some(20),Some(300)))* (zhaoliu,(Some(21),None))* (tianqi,(None,Some(400)))* (lisi,(Some(19),Some(200)))*///demo5 union//val result: RDD[(String, Int)] = nameRDD.union(sourceRDD)//result.foreach(println)/** demo5结果* (zhangsan,18)* (lisi,19)* (wangwu,20)* (zhaoliu,21)* (zhangsan,100)* (lisi,200)* (wangwu,300)* (tianqi,400)*///demo6 分区val nameRDD1: RDD[(String, Int)] = sc.parallelize(List[(String, Int)](("zhangsan", 18), ("lisi", 19), ("wangwu", 20), ("zhaoliu", 21)), 3)val sourceRDD1: RDD[(String, Int)] = sc.parallelize(List[(String, Int)](("zhangsan", 100), ("lisi", 200), ("wangwu", 300), ("tianqi", 400)), 4)val p1: Int = nameRDD1.getNumPartitionsval p2: Int = sourceRDD1.getNumPartitions//val result: RDD[(String, (Int, Int))] = nameRDD1.join(sourceRDD1)//val p3: Int = result.getNumPartitions//println(s"p1:$p1,p2:$p2,p3:$p3")/** p1:3,p2:4,p3:4 和多的分区保持一致 *///val result: RDD[(String, Int)] = nameRDD1.union(sourceRDD1)//val p3: Int = result.getNumPartitions//println(s"p1:$p1,p2:$p2,p3:$p3")/** p1:3,p2:4,p3:7 数据其实没有移动,只是把分区加在了一起 *///demo7 intersection交集 subtract差集val rdd1: RDD[Int] = sc.parallelize(List[Int](1, 2, 3))val rdd2: RDD[Int] = sc.parallelize(List[Int](2, 3, 5))//rdd1.intersection(rdd2).foreach(println)/*** 3* 2*///rdd1.subtract(rdd2).foreach(println)/** 1 *///rdd2.subtract(rdd1).foreach(println)/** 5 *///demo8 优化频繁操作 mapPartitions分区数据处理val rdd: RDD[String] = sc.parallelize(List[String]("hello1", "hello2", "hello3", "hello4"), 2)/* rdd.map(one => {println("建立数据库连接...")println(s"插入数据库数据:$one")println("关闭数据库连接...")one + "!"}).count()*//** 频繁建立数据库连接!!!!!!!* 建立数据库连接...* 插入数据库数据:hello1* 关闭数据库连接...* 建立数据库连接...* 插入数据库数据:hello2* 关闭数据库连接...* 建立数据库连接...* 插入数据库数据:hello3* 关闭数据库连接...* 建立数据库连接...* 插入数据库数据:hello4* 关闭数据库连接...*/rdd.mapPartitions(iter => {val list = new ListBuffer[String]println("建立数据库连接...")while (iter.hasNext) {val str = iter.next()println(s"插入数据库数据:$str")list.+=(str)}println("关闭数据库连接...")list.iterator}).count()/*** 建立数据库连接...* 插入数据库数据:hello1* 插入数据库数据:hello2* 关闭数据库连接...* 建立数据库连接...* 插入数据库数据:hello3* 插入数据库数据:hello4* 关闭数据库连接...*/}

3 demo3-- spark集群验证 yarn集群验证

user_item_score.txt

1 100001 51 100002 31 100003 43 100001 23 100002 52 100001 12 100002 22 100003 42 100004 5

userwatchlist

package com.test.scala.sparkimport org.apache.spark.{SparkConf, SparkContext}object userwatchlist {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("userwatchlist test")val sc = new SparkContext(conf)val input_path = sc.textFile("./data/user_item_score.txt")val output_path = "./data/userwatchlist_output"//过滤掉分数小于2的数据val data = input_path.filter(x => {val fields = x.split(" ")fields(2).toDouble > 2}).map(x => {/*原始数据user item score->(user, (item1 score1))(user, (item2 score2))->(user,((item1 score1) (item2 score2)))->目标user -> item item item*/val fields = x.split(" ")(fields(0).toString, (fields(1).toString, fields(2).toString))}).groupByKey().map(x => {val userid = x._1val item_score_tuple_list = x._2//根据score进行排序val tmp_arr = item_score_tuple_list.toArray.sortWith(_._2 > _._2)var watchlen = tmp_arr.length//取前5个if (watchlen > 5) {watchlen = 5}val strbuf = new StringBuilderfor (i <- 0 until watchlen) {strbuf ++= tmp_arr(i)._1strbuf.append(":")strbuf ++= tmp_arr(i)._2strbuf.append(" ")}userid + "\t" + strbuf})data.saveAsTextFile(output_path)}}

3.1本地验证结果

3.2通过spark集群验证修改scala类

maven打包将文件和jar包上传到linux上在将文件上传到hdfs上

新建run.sh

/usr/local/src/spark-1.6.0-bin-hadoop2.6/bin/spark-submit \--master spark://master:7077 \--num-executors 2 \--executor-memory 1g \--executor-cores 1 \--driver-memory 1g \--class com.test.scala.spark.userwatchlist /root/test_spark/test-1.0-SNAPSHOT.jar

运行bash run.sh

3.3通过hadoop集群验证修改run.sh

/usr/local/src/spark-1.6.0-bin-hadoop2.6/bin/spark-submit \--master yarn-cluster \--num-executors 2 \--executor-memory 1g \--executor-cores 1 \--driver-memory 1g \--class com.test.scala.spark.userwatchlist /root/test_spark/test-1.0-SNAPSHOT.jar

删除刚刚的输出路径(如果没操作spark集群验证的则不用)hadoop fs -rmr- /userwatchlist_output

4 demo4-- cf算法

package com.test.scala.sparkimport org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ArrayBufferimport scala.math._object cf {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local")conf.setAppName("CF")val sc = new SparkContext(conf)val input_path = args(0).toStringval output_path = args(1).toStringval lines = sc.textFile(input_path)val max_prefs_per_user = 20val topn = 5//step1val ui_rdd = lines.map { x =>val fileds = x.split(" ")(fileds(0).toString, (fileds(1).toString, fileds(2).toDouble))}.groupByKey().flatMap { x =>val user = x._1val item_score_list = x._2var is_arr = item_score_list.toArrayvar is_list_len = is_arr.lengthif (is_list_len > max_prefs_per_user) {is_list_len = max_prefs_per_user}//转置var i_us_arr = new ArrayBuffer[(String, (String, Double))]()for (i <- 0 until is_list_len) {i_us_arr += ((is_arr(i)._1,(user,is_arr(i)._2)))}i_us_arr}.groupByKey().flatMap{x=>//归一化val item = x._1val u_list = x._2val us_arr = u_list.toArrayvar sum:Double = 0.0for(i <- 0 until us_arr.length){sum += pow(us_arr(i)._2,2)}sum = sqrt(sum)var u_is_arr = new ArrayBuffer[(String, (String, Double))]()for(i <- 0 until us_arr.length){u_is_arr += ((us_arr(i)._1,(item,us_arr(i)._2 / sum)))}u_is_arr/*设置参数测试(2,CompactBuffer((100002,0.3244428422615251), (100003,0.7071067811865475), (100004,1.0), (100001,0.18257418583505536)))(3,CompactBuffer((100002,0.8111071056538127), (100001,0.3651483716701107)))(1,CompactBuffer((100002,0.48666426339228763), (100003,0.7071067811865475), (100001,0.9128709291752769)))*/}.groupByKey()//step2val unpack_rdd = ui_rdd.flatMap{x=>val is_arr = x._2.toArrayvar ii_s_arr = new ArrayBuffer[((String,String),Double)]()for(i <- 0 until is_arr.length-1){for(j <- 0 until is_arr.length){ii_s_arr += (((is_arr(i)._1,is_arr(j)._1),is_arr(i)._2 * is_arr(j)._2))ii_s_arr += (((is_arr(j)._1,is_arr(i)._1),is_arr(i)._2 * is_arr(j)._2))}}ii_s_arr/*测试((100002,100002),0.10526315789473685)((100002,100002),0.10526315789473685)((100002,100003),0.22941573387056174)((100003,100002),0.22941573387056174)((100002,100004),0.3244428422615251)((100004,100002),0.3244428422615251)((100002,100001),0.05923488777590923)((100001,100002),0.05923488777590923)((100003,100002),0.22941573387056174)((100002,100003),0.22941573387056174)((100003,100003),0.4999999999999999)((100003,100003),0.4999999999999999)*/}//step3unpack_rdd.groupByKey().map{x=>val ii_pair = x._1val s_list = x._2val s_arr = s_list.toArrayvar score:Double = 0.0for(i <- 0 until s_arr.length){score += s_arr(i)}(ii_pair._1,(ii_pair._2,score))/*测试(100002,(100002,2.0))(100002,(100001,0.7996709849747747))(100001,(100003,0.7745966692414834))(100003,(100002,1.1470786693528088))(100001,(100004,0.18257418583505536))(100004,(100001,0.18257418583505536))(100004,(100002,0.6488856845230502))(100004,(100004,2.0))(100003,(100001,0.7745966692414834))(100003,(100003,1.9999999999999996))(100002,(100004,0.6488856845230502))(100001,(100002,0.7996709849747747))(100003,(100004,1.414213562373095))(100004,(100003,1.414213562373095))(100002,(100003,1.1470786693528088))*/}.groupByKey().map{x=>val item_a = x._1val item_list = x._2val bs_arr = item_list.toArray.sortWith(_._2 > _._2)var len = bs_arr.lengthif(len > topn){len=topn}val s = new StringBuilderfor(i <- 0 until len){val item = bs_arr(i)._1val score = "%1.4f" format bs_arr(i)._2s.append(item+":"+score)if(i<len-1){s.append(",")}}item_a + "\t" + s}.saveAsTextFile(output_path)}}

设置参数测试:

结果:

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。