600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > scala中json与case class对象的转换 spark读取es json转换成case class

scala中json与case class对象的转换 spark读取es json转换成case class

时间:2024-07-01 09:34:10

相关推荐

scala中json与case class对象的转换  spark读取es json转换成case class

ilinux_one

scala中json与对象的转换

遇到的问题

因为要把spark从es读出来的json数据转换为对象,开始想用case class定义类型,通过fastjson做转换。如下

复制代码

复制代码

case class Book (author: String, content: String, id: String, time: Long, title: String)val json = "{\"author\":\"hll\",\"content\":\"ES即etamsports\",\"id\":\"693\",\"time\":1490165237200,\"title\":\"百度百科\"}"val mapper: ObjectMapper = new ObjectMapper()val book: Book = mapper.readValue(json, classOf[Book])

结果抛出了异常:com.fasterxml.jackson.databind.JsonMappingException: No suitable constructor found for type [simple type, class JsonTest$Book]

换成fastjson也会有相似的异常。

恍然大悟,case class没有空参构造函数,跟fastjson这些库不太兼容。

解决办法

然而又不想就java class,然后就找到了json4s-jackson,可以完美兼容scala的case class。

pom依赖:

<dependency><groupId>org.json4s</groupId><artifactId>json4s-jackson_2.10</artifactId><version>3.2.10</version></dependency>

使用的样例代码:

//隐式转换必须要导入

import org.json4s._import org.json4s.jackson.JsonMethods._class Book(val author: String,val content: String,val id: String, val time: Long, val title: String)object JsonTest {def main(args: Array[String]) {val json = "{\"author\":\"hll\",\"content\":\"ES即etamsports\",\"id\":\"693\",\"time\":1490165237200,\"title\":\"百度百科\"}"//导入隐式值implicit val formats = DefaultFormatsval book: Book = parse(json).extract[Book]println(book.content)}}

实际使用与思考

spark程序中的应用:

implicit val formats = DefaultFormatsesRDD.map(_._2).map(parse(_).extract[Book]).sortBy(_.time, false).take(10).foreach(println)

spark里面解析json数据有一个经典的问题,ObjectMapper对象的创建很重。一般使用mapPartition来对一个分区复用ObjectMapper对象。

我们来看一下parse方法的源码:

private[this] lazy val _defaultMapper = {val m = new ObjectMapper()m.registerModule(new Json4sScalaModule)m}def mapper = _defaultMapperdef parse(in: JsonInput, useBigDecimalForDouble: Boolean = false): JValue = {mapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, useBigDecimalForDouble)in match {case StringInput(s) => mapper.readValue(s, classOf[JValue])case ReaderInput(rdr) => mapper.readValue(rdr, classOf[JValue])case StreamInput(stream) => mapper.readValue(stream, classOf[JValue])case FileInput(file) => mapper.readValue(file, classOf[JValue])}}

实际使用的ObjectMapper对象是lazy初始化的而且是复用的,避免了ObjectMapper对象的重复创建,很nice。

转自/ilinuxer/p/6864034.html

个人实现

import org.apache.spark.{SparkConf, SparkContext}import org.elasticsearch.spark._import org.json4s._import org.json4s.jackson.JsonMethods._object SparkOnEs extends Serializable {implicit val formats = DefaultFormatscase class cc(vin: String)def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local").setAppName("aaa")conf.set("es.nodes", "xxx")conf.set("es.port", "9200")conf.set("es.mapping.date.rich", "false")conf.set("es.index.read.missing.as.empty", "true")val sc = new SparkContext(conf)val value = sc.esJsonRDD("xxxx")value.map(_._2).map(parse(_).extract[cc]).take(10).foreach(println)}}

相关引包

<dependency><groupId>org.json4s</groupId><artifactId>json4s-jackson_2.11</artifactId><version>3.6.7</version></dependency>

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。