600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > Spark高级数据分析——纽约出租车轨迹的空间和时间数据分析

Spark高级数据分析——纽约出租车轨迹的空间和时间数据分析

时间:2023-01-28 18:14:36

相关推荐

Spark高级数据分析——纽约出租车轨迹的空间和时间数据分析

Spark高级数据分析——纽约出租车轨迹的空间和时间数据分析

一、地理空间分析:二、pom.xml

原文地址:/p/eb6f3e0c09b5

作者:IIGEOywq

一、地理空间分析:

object RunGeoTime extends Serializable {val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ENGLISH)def main(args: Array[String]): Unit = {/*--------------1.初始化SparkContext-------------------*/val sc = new SparkContext(new SparkConf().setAppName("SpaceGeo"))/*--------------2.读取HDFS数据-------------------*/val taxiRaw = sc.textFile("hdfs://master:9000/taxidata")/*--------------3.出租车数据预处理------------------*///3.1 利用自定义的safe函数处理原始数据val safeParse = safe(parse)val taxiParsed = taxiRaw.map(safeParse)//taxiParsed数据持久化taxiParsed.cache()//查看非法数据/* val taxiBad = taxiParsed.collect({case t if t.isRight => t.right.get})*///collect返回到驱动器,为了单机开发和测试使用,不建议集群使用//taxiBad.collect().foreach(println)/*val taxiGood = taxiParsed.collect({case t if t.isLeft => t.left.get})taxiGood.cache()*///3.2 剔除非法数据结果,获得正确格式的数据val taxiGood=taxiParsed.filter(_.isLeft).map(_.left.get)taxiGood.cache()//自定义一次打车的乘坐时间函数def hours(trip: Trip): Long = {val d = new Duration(trip.pickupTime, trip.dropoffTime)d.getStandardHours}//3.3 打印统计乘客上下车时间的记录,打印结果如执行分析结果图中的1taxiGood.values.map(hours).countByValue().toList.sorted.foreach(println)taxiParsed.unpersist()//根据上面的输出结果,统计一次乘车时间大于0小于3小时的记录val taxiClean = taxiGood.filter {case (lic, trip) => {val hrs = hours(trip)0 <= hrs && hrs < 3}}/*--------------4.出租车数据空间分析------------------*///4.1 获取纽约行政区划数据val geojson = scala.io.Source.fromURL(getClass.getResource("/nyc-boroughs.geojson")).mkString//转换为地理要素val features = geojson.parseJson.convertTo[FeatureCollection]val areaSortedFeatures = features.sortBy(f => {val borough = f("boroughCode").convertTo[Int](borough, -f.geometry.area2D())})val bFeatures = sc.broadcast(areaSortedFeatures)//4.2 判断乘客下车点落在那个行政区def borough(trip: Trip): Option[String] = {val feature: Option[Feature] = bFeatures.value.find(f => {f.geometry.contains(trip.dropoffLoc)})feature.map(f => {f("borough").convertTo[String]})}//4.3 第一次统计打印各行政区下车点的记录,打印结果如执行分析结果图中的2taxiClean.values.map(borough).countByValue().foreach(println)//4.4 剔除起点和终点数据缺失的数据def hasZero(trip: Trip): Boolean = {val zero = new Point(0.0, 0.0)(zero.equals(trip.pickupLoc) || zero.equals(trip.dropoffLoc))}val taxiDone = taxiClean.filter {case (lic, trip) => !hasZero(trip)}.cache()//4.5 踢出零点数据后统计打印各行政区下车点的记录,打印结果如执行分析结果图中的3taxiDone.values.map(borough).countByValue().foreach(println)taxiGood.unpersist()//输出地理空间分析结果到HDFS//taxiDone.saveAsTextFile("hdfs://master:9000/GeoResult")}//字符串转doubledef point(longitude: String, latitude: String): Point = {new Point(longitude.toDouble, latitude.toDouble)}//获取taxiraw RDD记录中的出租车司机驾照和Trip对象def parse(line: String): (String, Trip) = {val fields = line.split(',')val license = fields(1)// Not thread-safe:val formatterCopy = formatter.clone().asInstanceOf[SimpleDateFormat]val pickupTime = new DateTime(formatterCopy.parse(fields(5)))val dropoffTime = new DateTime(formatterCopy.parse(fields(6)))val pickupLoc = point(fields(10), fields(11))val dropoffLoc = point(fields(12), fields(13))val trip = Trip(pickupTime, dropoffTime, pickupLoc, dropoffLoc)(license, trip)}//非法记录数据处理函数def safe[S, T](f: S => T): S => Either[T, (S, Exception)] = {new Function[S, Either[T, (S, Exception)]] with Serializable {def apply(s: S): Either[T, (S, Exception)] = {try {Left(f(s))} catch {case e: Exception => Right((s, e))}}}}}

二、pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/POM/4.0.0 /maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.cloudera.datascience.geotime</groupId><artifactId>ch08-geotime</artifactId><packaging>jar</packaging><name>Temporal and Geospatial Analysis</name><version>2.0.0</version><dependencies><!--注意 scala版本对应spark集群中scala的版本,provided属性要加上 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version><scope>provided</scope></dependency><!--注意 hadoop版本对应spark集群中hadoop的版本,provided属性要加上 --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.3</version><scope>provided</scope></dependency><!--注意 spark版本对应spark集群中spark的版本,2.11是对应的scala版本 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.0.1</version><scope>provided</scope></dependency><!--nscala-time时间处理库,2.11是对应的scala版本 --><dependency><groupId>com.github.nscala-time</groupId><artifactId>nscala-time_2.11</artifactId><version>1.8.0</version></dependency><!--esri空间关系库,2.11是对应的scala版本 --><dependency><groupId>com.esri.geometry</groupId><artifactId>esri-geometry-api</artifactId><version>1.2.1</version></dependency><dependency><groupId>io.spray</groupId><artifactId>spray-json_2.11</artifactId><version>1.3.2</version></dependency><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>2.9.4</version></dependency></dependencies><build><plugins><!--scala-maven插件必须加上,否则打包后无主程序 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><configuration><scalaVersion>2.11.8</scalaVersion><scalaCompatVersion>2.11.8</scalaCompatVersion><args><arg>-unchecked</arg><arg>-deprecation</arg><arg>-feature</arg></args><javacArgs><javacArg>-source</javacArg><javacArg>1.8.0</javacArg><javacArg>-target</javacArg><javacArg>1.8.0</javacArg></javacArgs></configuration><executions><execution><phase>compile</phase><goals><goal>compile</goal></goals></execution></executions></plugin><!--maven-assembly插件可以打包应用的依赖包 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><archive><manifest><mainClass>com.cloudera.datascience.geotime.RunGeoTime</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><recompressZippedFiles>false</recompressZippedFiles></configuration><executions><execution><id>make-assembly</id> <!-- 用于maven继承项目的聚合 --><phase>package</phase> <!-- 绑定到package阶段 --><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。