600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > Spark-SQL CSV转换为Parquet文件 设置默认为block分区数或自定义分区数

Spark-SQL CSV转换为Parquet文件 设置默认为block分区数或自定义分区数

时间:2021-05-03 04:42:23

相关推荐

Spark-SQL  CSV转换为Parquet文件 设置默认为block分区数或自定义分区数

一、Spark-sql创建外部分区表

1.使用spark-sql

spark-sql --queue spark --master yarn --deploy-mode client --num-executors 10 --executor-cores 2 --executor-memory 3G

2.spark-sql中创建parquet分区

create external table pgls.convert_parq(bill_num string,logis_id string,store_id string,store_code string,creater_id string,order_status INT,pay_status INT,order_require_varieties INT,order_require_amount decimal(19,4),order_rec_amount decimal(19,4),order_rec_gpf decimal(19,4),deli_fee FLOAT,order_type INT,last_modify_time timestamp,order_submit_time timestamp) partitioned by(order_submit_date date)row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'stored as parquetfilelocation '/test/spark/convert/parquet/bill_parq/';

二、CSV转Parquet

代码:org.apache.spark.ConvertToParquet.scala

package org.apache.sparkimport com.ecfront.fs.operation.HDFSOperationimport org.apache.hadoop.conf.Configurationimport org.apache.spark.sql.SQLContextimport org.apache.spark.sql.types._/*** CSV 转换为 parquet* 参数:输入路径, 输出路径, 分区数*/object ConvertToParquet{def main(args: Array[String]) {if(args.length != 3){println("jar args: inputFiles outPath numpartitions")System.exit(0)}val inputPath = args(0)val outPath = args(1)val numPartitions = args(2).toIntprintln("==========================================")println("=========input: "+ inputPath )println("=========output: "+ outPath )println("==numPartitions: "+ numPartitions )println("==========================================")//判断输出目录是否存在,存在则删除val fo = HDFSOperation(new Configuration())val existDir = fo.existDir(outPath)if(existDir) {println("HDFS exists outpath: " + outPath)println("start to delete ...")val isDelete = fo.deleteDir(outPath)if(isDelete){println(outPath +" delete done. ")}}val conf = new SparkConf()val sc = new SparkContext(conf) //参数SparkConf创建SparkContext,val sqlContext = new SQLContext(sc) //参数SparkContext创建SQLContextval schema = StructType(Array(StructField("bill_num",DataTypes.StringType,false),StructField("logis_id",DataTypes.StringType,false),StructField("store_id",DataTypes.StringType,false),StructField("store_code",DataTypes.StringType,false),StructField("creater_id",DataTypes.StringType,false),StructField("order_status",DataTypes.IntegerType,false),StructField("pay_status",DataTypes.IntegerType,false),StructField("order_require_varieties",DataTypes.IntegerType,false),StructField("order_require_amount",DataTypes.createDecimalType(19,4),false),StructField("order_rec_amount",DataTypes.createDecimalType(19,4),false),StructField("order_rec_gpf",DataTypes.createDecimalType(19,4),false),StructField("deli_fee",DataTypes.FloatType,false),StructField("order_type",DataTypes.IntegerType,false),StructField("last_modify_time",DataTypes.TimestampType,false),StructField("order_submit_time",DataTypes.TimestampType,false),StructField("order_submit_date",DataTypes.DateType,false)))convert(sqlContext, inputPath, schema, outPath, numPartitions)}//CSV转换为parquetdef convert(sqlContext: SQLContext, inputpath: String, schema: StructType, outpath: String, numPartitions: Int) {// 将text导入到DataFrameval df = sqlContext.read.format("com.databricks.spark.csv").schema(schema).option("delimiter", ",").load(inputpath)// 转换为parquet// df.write.parquet(outpath) // 转换时以block数为分区数df.coalesce(numPartitions).write.parquet(outpath) //自定义分区数}}

打包后jar上传至本地目录:/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar事先在HDFS上生成CSV文件,HDFS目录:/test/spark/convert/data/order/-05-01/执行命令:

spark-submit --queue spark --master yarn --num-executors 10 --executor-cores 2 --executor-memory 3G --class org.apache.spark.ConvertToParquet --name ConvertToParquet file:/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar /test/spark/convert/data/order/-05-01/ /test/spark/convert/parquet/bill_parq/order_submit_date=-05-01

pom.xml相关内容:

1.依赖包:

<dependencies><!-- 操作HDFS --><dependency><groupId>com.ecfront</groupId><artifactId>ez-fs</artifactId><version>0.9</version></dependency><!--spark --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>1.6.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>1.6.1</version></dependency><!--spark csv--><dependency><groupId>com.databricks</groupId><artifactId>spark-csv_2.11</artifactId><version>1.4.0</version></dependency><!--hadoop --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.0</version></dependency></dependencies>

2.plugins(含打入依赖包)

<build><pluginManagement><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.1</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>2.0.2</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>1.4</version><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></plugin></plugins></pluginManagement><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>scala-test-compile</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><executions><execution><phase>compile</phase><goals><goal>compile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>1.4</version><configuration><createDependencyReducedPom>true</createDependencyReducedPom></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.apache.spark.ConvertToParquet</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>

三、表添加分区

spark-sql下执行

alter table pgls.convert_parq add partition(order_submit_date='-05-01');

可通过sql查询到相应数据:

select * from pgls.convert_parq where order_submit_date='-05-01' limit 5;

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。