包含sparksql的完整使用案例,请务必耐心看完
专题:大数据单机学习环境搭建和使用
1.Spark安装2.Spark配置2.1配置环境变量2.2spark客户端3.Spark使用3.1环境准备3.2脚本说明3.3服务开启3.4脚本执行大数据单机学习环境搭建(9)Spark单节点安装与pyspark使用
1.Spark安装
apache官网下载spark
个人下载的资源分享
# 解压安装,我的位置都在/opttar -zxvf spark-3.2.1-bin-hadoop3.2.tgz# 改名mv spark-3.2.1-bin-hadoop3.2 spark
2.Spark配置
2.1配置环境变量
# 配置环境变量vim /etc/profile# 添加内容export SPARK_HOME=/opt/sparkexport PYSPARK_PYTHON=/opt/anaconda3/bin/pythonexport HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop# 更新配置文件使其立即生效source /etc/profile# /root/.bashrc配置在JAVA_HOME和PYSPARK_PYTHONexport JAVA_HOME=/opt/jdkexport PYSPARK_PYTHON=/opt/anaconda3/bin/python
2.2spark客户端
下面所有内容都是Spark Local环境下运行,从Pyspark解释器启动和样例脚本构建SparkSession都可以看到
/opt/spark/bin/pyspark # 交互式的 Python解释器环境/opt/spark/bin/spark-shell # scala解释器环境/opt/spark/bin/spark-submit # 提交jar包或Python文件执行的工具/opt/spark/bin/spark-sql # sparksql客户端工具
pyspark解释器运行示例,进入后可写python代码执行
3.Spark使用
没有废话要仔细看,完整可运行
3.1环境准备
Hadoop本地单节点安装
Hive安装和启用
Linux单节点Anaconda安装和Pycharm连接 中1和2的内容完成Anaconda的pyspark包安装
3.2脚本说明
sparksql构建SparkSession需要用到HDFS和metastore服务,所以需要提前开启数据准备
1)数据内容自行设置一个如下数据就行,文件命名问tmsapp_tms_app_instant_tag_visit_a_d20520格式,后面日期换成当前日期,因为脚本中用的是
datetime.datetime.now().strftime('%Y%m%d')
作为文件补充名
LH91417,青铜会员LH15905,原木会员LH29615,原木会员LH88779,黄金会员
2)把文件放在这个路径下/home/tmp_app_data/,没有路径就自建mkdir /home/tmp_app_data
,或者自己改动脚本中load路径
3)脚本文件放在自己记得住的位置,位置名字随便起,例如我的是 /tmp/pycharm_project_652/LHbank/01pyspark_sql_visit_mid.py脚本内容,包含删表、建表、删除分区、建立分区、插入数据、查询数据,共6步sql操作。
# -*- coding:utf-8 -*-"""删表、建表、删除分区、建立分区、插入数据、查询数据"""from pyspark.sql import SparkSessionimport datetimespark = SparkSession.builder \.appName('learn') \.master("local[*]") \.config("spark.sql.shuffle.partitions", 1) \.config("spark.sql.warehouse.dir", "hdfs://自己的ip:8020/user/hive/warehouse") \.config("hive.metastore.uris", "thrift://自己的ip:9083") \.enableHiveSupport() \.getOrCreate()# 删表sql_drop_table = """drop table if exists tms_app.tmsapp_tms_app_instant_tag_visit_a_d;"""spark.sql(sql_drop_table)# 建表sql_create_table = """create table tms_app.tmsapp_tms_app_instant_tag_visit_a_d(cust_id string comment '客户号',vip_namestringcomment '会员名称')partitioned by (dt string)row format delimited fields terminated by ',';"""spark.sql(sql_create_table)# 分区格式yyyy-MM-dd,当前日期yyyyMMdd格式cur_dt = datetime.datetime.now().strftime('%Y-%m-%d')cur_dt_simple = datetime.datetime.now().strftime('%Y%m%d')# 删除分区sql_drop_partition = """alter table tms_app.tmsapp_tms_app_instant_tag_visit_a_d drop if exists partition (dt='{}');""".format(cur_dt)spark.sql(sql_drop_partition)# 增加分区sql_add_partition = """alter table tms_app.tmsapp_tms_app_instant_tag_visit_a_d add partition (dt='{}');""".format(cur_dt)spark.sql(sql_add_partition)# load数据sql_load_data = """load data local inpath '/home/tmp_app_data/tmsapp_tms_app_instant_tag_visit_a_d{}.txt' overwrite into table tms_app.tmsapp_tms_app_instant_tag_visit_a_d partition (dt='{}');""".format(cur_dt_simple, cur_dt)spark.sql(sql_load_data)# 查询sql = """select * from tms_app.tmsapp_tms_app_instant_tag_visit_a_d where dt='{}' limit 5;""".format(cur_dt)spark.sql(sql).show()print('OK')spark.stop()
3.3服务开启
开启hadoop和hive metastore服务/opt/hadoop/sbin/start-all.sh nohup /opt/hive/bin/hive --service metastore &netstat -anp| grep 9083jps
结果如图关闭防火墙打开systemctl stop firewalld
Namenode网页http://自己的ip:9870
3.4脚本执行
先用hive创建一个名为tms_app
的数据库,名称无所谓,我的示例样本写的这个,所以就创建了这个。
hive -e 'create database tms_app;'
调用脚本执行
spark-submit /tmp/pycharm_project_652/LHbank/01pyspark_sql_visit_mid.py
HDFS查看数据文件
至此,spark已可以正常运行,pyspark.sql的案例也介绍完毕了。欢迎接着看 Pycharm中操作spark和hive
声明:本文所载信息不保证准确性和完整性。文中所述内容和意见仅供参考,不构成实际商业建议,如有雷同纯属巧合。