600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > Hadoop学习之路(五):Hadoop交互关系型数据库(MySQL)

Hadoop学习之路(五):Hadoop交互关系型数据库(MySQL)

时间:2023-06-03 14:09:41

相关推荐

Hadoop学习之路(五):Hadoop交互关系型数据库(MySQL)

内容简介

一、Hadoop与数据库交互简介二、操作前的准备1.创建表2.将数据插入表words中2.将MySQL的驱动分发到所有的Hadoop节点三、Hadoop与MySQL交互实现1.创建Java工程,添加Maven支持2.创建并编写类MySqlDBWritable3.创建并编写类MapByMysql4.创建并编写类ReducerByMysql5.创建并编写作业类MapByMysqlApp6.打包作业提交到集群7.执行程序查看结果四、总结

一、Hadoop与数据库交互简介

Hadoop在处理数据过程中接触得较多的就是文本文件即TextInputFormat,这也是MapReducer框架默认的输入格式,但是在某些情况下Hadoop需要与关系型数据库,如MySQL、Oracle等进行交互,有以下原因:

需要将关系型数据库的数据作为处理的数据输入需要将数据的统计结果存储进关系型数据库以备分析将关系型数据库的数据迁移进HDFS

第于第三点而言,更为流行的做法就是使用Apache 的 Sqoop,先挖个坑这个以后会介绍。为此,Hadoop提供DBInputFormat输入类,满足这个需求。DBInputFormat使用JDBC从关系型数据库中读取数据,而与之对应的输出类是DBOutputFormat,它也是使用JDBC将数据写入关系型数据库中。

为了演示全面,本次操作将完成这样一个任务,从数据库的words表中读取单词,然后统计单词个数后将结果写入stats表中。具体做法是先编写类MySqlDBWritable,对从关系型数据库中取出的数据进行串行化与逆串行化,然后编写常规的Map与Reducer统计单词个数,并将数据分装进MySqlDBWritable中写入数据库。

二、操作前的准备

Hadoop版本是:2.6.0-cdh5.7.0

开发工具是:IDEA

关系型数据库是:MySQL5.6.26

1.创建表

在MySQL中执行如下命令:

创建表words:create table words(id int primary key auto_increment,line varchar(256)),其中字段line代表一行空格分隔的单词。

创建表status:create table stats(word varchar(25),count int),其中字段word 代表统计的单词,count 是其统计个数。

2.将数据插入表words中

hello world hello hadoophello spark hello hbasehello hive hello hadoophello kafka hello flumehello flink hello sqoophello spark hello hbasehello kafka hello flumehello spark hello hbase

插入数据后words表数据为:

2.将MySQL的驱动分发到所有的Hadoop节点

这一步很关键,因为DBInputFormat和DBOutputFormat需要使用JDBC与MySQL进行交互,而Hadoop默认情况下是没有MySQL的驱动,所以需要将MySQL的驱动分发至所有数据节点上,具体路径是:$HADOOP_HOME/share/hadoop/common/lib下。

三、Hadoop与MySQL交互实现

1.创建Java工程,添加Maven支持

完整的Maven依赖如下:

<properties><hadoop.version>2.6.0-cdh5.7.0</hadoop.version></properties><repositories><repository><id>cloudera</id><url>/artifactory/cloudera-repos</url></repository></repositories><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency></dependencies><build><pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.0.0</version></plugin><!-- see /ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.7.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.20.1</version></plugin><plugin><artifactId>maven-jar-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-install-plugin</artifactId><version>2.5.2</version></plugin><plugin><artifactId>maven-deploy-plugin</artifactId><version>2.8.2</version></plugin></plugins></pluginManagement><finalName>mysqlapp1.0.0</finalName></build>

2.创建并编写类MySqlDBWritable

/*** 1.封装words表和stats表的字段以及对其串行化和逆串行化* 2.读取words表封装进该类进行串行化* 3.讲统计结果封装进该类进行逆串行化后写入MySQL*/public class MySqlDBWritable implements DBWritable, Writable {//words表字段private int id;private String line;//stats表字段private String word;private int count;public int getId() {return id;}public String getWord() {return word;}public void setWord(String word) {this.word = word;}public int getCount() {return count;}public void setCount(int count) {this.count = count;}public void setId(int id) {this.id = id;}public String getLine() {return line;}public void setLine(String line) {this.line = line;}//串行化@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(id);out.writeUTF(line);out.writeUTF(word);out.writeInt(count);}//逆串行化@Overridepublic void readFields(DataInput in) throws IOException {id = in.readInt();line = in.readUTF();word = in.readUTF();count = in.readInt();}//往数据库表stats写入数据@Overridepublic void write(PreparedStatement statement) throws SQLException {statement.setString(1,word);statement.setInt(2,count);}//在数据库表words中读取数据@Overridepublic void readFields(ResultSet resultSet) throws SQLException {id = resultSet.getInt(1);line = resultSet.getString(2);}}

3.创建并编写类MapByMysql

/*** 从MySQL中的words表读取数据* 并将其映射为(word,1)元组*/public class MapByMysqlClass extends Mapper<LongWritable,MySqlDBWritable, Text, IntWritable> {@Overrideprotected void map(LongWritable key, MySqlDBWritable value, Context context) throws IOException, InterruptedException {//将words表的line字段取出来以空格分隔String[] words = value.getLine().split(" ");//映射为(word,1)元组for(String word:words){context.write(new Text(word),new IntWritable(1));}}}

4.创建并编写类ReducerByMysql

/*** 对(word,1)元组中的word聚合* 并结果写入MySqlDBWritable中*/public class ReducerByMysqlClass extends Reducer<Text, IntWritable,MySqlDBWritable, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int count = 0;MySqlDBWritable mySqlDBWritable = new MySqlDBWritable();//迭代统计for(IntWritable i : values){count += i.get();}//将统计结果写入MySqlDBWritable中mySqlDBWritable.setWord(key.toString());mySqlDBWritable.setCount(count);context.write(mySqlDBWritable,NullWritable.get());}}

5.创建并编写作业类MapByMysqlApp

public class MapByMysqlApp {public static void main(String[] args) throws Exception{Configuration conf = new Configuration();//MySQL驱动String classDriver = "com.mysql.jdbc.Driver";//MySQL连接串,后面跟存放你创建表的数据库String url="jdbc:mysql://hadoop00:3306/jdbc";//账号密码String userName = "root";String password = "root";//实例化作业Job job = Job.getInstance(conf,"MapByMysqlApp");//设置作业主类job.setJarByClass(MapByMysqlApp.class);//设置数据库输入类型job.setInputFormatClass(DBInputFormat.class);//设置数据库输出类型job.setOutputFormatClass(DBOutputFormat.class);//设置Map类job.setMapperClass(MapByMysqlClass.class);//设置Map类Key的输出类型job.setMapOutputKeyClass(Text.class);//设置Map类Value的输出类型job.setMapOutputValueClass(IntWritable.class);//设置Reducer类job.setReducerClass(ReducerByMysqlClass.class);//设置Reducer类Key的输出类型job.setOutputKeyClass(MySqlDBWritable.class);//设置Reducer类Value的输出类型job.setOutputValueClass(NullWritable.class);//配置数据库信息DBConfiguration.configureDB(job.getConfiguration(),classDriver,url,userName,password);//输入类型为数据库关系类型,通过SQL语句将查询的数据封装进MySqlDBWritable中DBInputFormat.setInput(job,MySqlDBWritable.class,"select * from words","select count(*) from words");//输出类型为数据库关系类型,将数据插入到status表中,字段分别是word,countDBOutputFormat.setOutput(job,"stats","word","count");//提价作业job.waitForCompletion(true);}}

6.打包作业提交到集群

在工程项目路径下的target文件夹下有生成的jar包,把它提交到集群主机上。

7.执行程序查看结果

执行命令:hadoop jar mysqlapp1.0.0.jar com.hadoop.mysql.MapByMysqlApp,第一个参数是你的jar包的路径,第二个参数是作业主类的全路径。进入MySQL中执行SQL语句:select * from stats查看单词统计结果:

至此,Hadoop交互MySQL的全部过程已经完成。

四、总结

本次操作演示了Hadoop交互MySQL,包括从数据库中读取数据和将将数据写入数据库,这也是非常之常见的操作。其实,Hadoop所处理的数据类型可以混合搭配使用,比如说可以使用TextInputFormat从HDFS读取数据,处理后使用DBOutputFormat将数据写入到关系型数据库,非常的灵活,还支持多种格式同时输入等等,这些后面都会介绍。非常感您的阅读,如有错误请不吝赐教!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。