Ich bin ziemlich neu zu funken und ich versuche, ein DStream als ein Json von einem Kafka-Thema strukturiert zu erhalten, und ich möchte den Inhalt jedes json analysieren. Die json ich erhalte, ist so etwas wie diese:Parsen json in spark-streaming
{"type":"position","ident":"IBE32JZ","air_ground":"A","alt":"34000","clock":"1409733420","id":"IBE32JZ-1409715361-ed-0002:0","gs":"446","heading":"71","lat":"44.50987","lon":"2.98972","reg":"ECJRE","squawk":"1004","updateType":"A","altChange":" "}
Ich versuche, das ident Feld zu extrahieren nur, zumindest für jetzt und ich bin mit Lift-json Bibliothek de Daten zu analysieren. Mein Programm sieht wie folgt aus:
aber es wirft mich die Ausnahme unter:
java.lang.NoClassDefFoundError: scala/reflect/ClassManifest
at net.liftweb.json.JsonAST$JValue.extract(JsonAST.scala:300)
at aero.catec.stratio.ScalaExample$.parser(ScalaExample.scala:33)
at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48)
at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:575)
at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:560)
Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
Die Sache ist die, dass, wenn ein Lauf das gleiche, ohne Funken mit (Lesen aus einer Datei) es funktioniert perfekt. Das Problem beginnt, wenn ich versuche, es in ein Funkenprogramm zu bringen. Auch wenn ich die Parser-Funktion in etwa so ändere:
def parser(json: String): JValue = {
val parsedJson = parse(json)
return (parsedJson \\ "ident")
}
Es funktioniert auch. Aber wenn ich versuche, den eigentlichen String zu extrahieren, bekomme ich den gleichen Fehler.
Vielen Dank für Ihre Hilfe. Ich hoffe ich habe es gut erklärt.
Es ist wahrscheinlich ein Konflikt in der Scala-Version, die Sie verwenden. –
Kann ich annehmen, dass "paso1.extract [PlaneInfo]" geparstJson.extract [PlaneInfo] sein sollte? – Gillespie