600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > pyspark的rdd直接写入mysql

pyspark的rdd直接写入mysql

时间:2023-05-24 11:46:35

相关推荐

pyspark的rdd直接写入mysql

Google搜索"RDD write into mysql"前面5页得到:[5][6][7][8][9][10]

我们一个个来分析

[1][2][3]读出的是RDD,写入的是foreachpartition的方式

[4]写入的不是spark RDD,而是一个Spark的DataFrame类型的变量

[5]写入的不是spark RDD,而是Spark RDD转化为DataFrame类型然后写入

[6][7]写入的是spark DataFrame

[8]仅仅是代码bug而已

[9]写入的是dataframe不是rdd

[10]提到了检查partition的数量可以加速rdd的写入,里面有两种方式,第一种是RDD写入,第二种是Data Frame写入。

结论:

[1][2][3][10]有用(都是scala语言),其他都没有用。

[11]提到了针对partition来提高速度。

尝试Pyspark版本:

import pandas as pdfrom pyspark.sql import SparkSessionfrom pyspark import SparkContextfrom pyspark.sql import SQLContextfrom pymysql import *def map_extract(element):file_path, content = elementyear = file_path[-8:-4]return [(year, i) for i in content.split("\n") if i]spark = SparkSession\.builder\.appName("PythonTest")\.getOrCreate()res = spark.sparkContext.wholeTextFiles('hdfs://Desktop:9000/user/mercury/names',minPartitions=40) \.map(map_extract) \.flatMap(lambda x: x) \.map(lambda x: (x[0], int(x[1].split(',')[2]))) \.reduceByKey(lambda x,y:x+y)def write2mysql(x):conn = connect(host='Desktop', port=3306, database='leaf', user='appleyuchi',password='appleyuchi', charset='utf8')cs1 = conn.cursor()# 执行sql语句#---------------------------------------for item in x:key = str(item[0])num = item[1]# values = (key,num)# print("values=",values)sql = "insert into `spark`(`key`,`num`) values (%s,%s)"cs1.execute(sql,(key,num))# 提交之前的操作,如果之前已经执行多次的execute,那么就都进行提交mit()#---------------------------------------# 关闭cursor对象cs1.close()# 关闭connection对象conn.close()res.foreachPartition(lambda x:write2mysql(x))

数据集用的是 :

/wesm/pydata-book/tree/2nd-edition/datasets/babynames

如果报错:

pymysql.err.InternalError: (1049, "Unknown database 'leaf'")

查过存在该数据库还是解决不了的话,参考:

/appleyuchi/cluster_configuration/blob/master/物理环境配置流程-必须先看.txt

Reference:

[1]RDD从mysql中读取数据和RDD往数据库中存数据(代码完整)

[2]RDD 直接存入MySQL,以及直接读取MySQL中数据(代码不完整)

[3]spark以rdd方式读写mysql

[4]Spark RDD写入RMDB(Mysql)方法二

[5]How to put data from Spark RDD to Mysql Table

[6]WRITING TO A DATABASE FROM SPARK

[7]save spark rdd into Mysql

[8]Writing from PySpark to MySQL Database

[9]spark 2.1写入mysql spark 2.1 write to mysql

[10]how to properly save spark rdd result to a mysql database(代码不完整)

[11]计算质数通过分区(Partition)提高Spark的运行性能

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。