2014-09-03 6 views
8

Ich bin ziemlich neu zu funken und ich versuche, ein DStream als ein Json von einem Kafka-Thema strukturiert zu erhalten, und ich möchte den Inhalt jedes json analysieren. Die json ich erhalte, ist so etwas wie diese:Parsen json in spark-streaming

{"type":"position","ident":"IBE32JZ","air_ground":"A","alt":"34000","clock":"1409733420","id":"IBE32JZ-1409715361-ed-0002:0","gs":"446","heading":"71","lat":"44.50987","lon":"2.98972","reg":"ECJRE","squawk":"1004","updateType":"A","altChange":" "} 

Ich versuche, das ident Feld zu extrahieren nur, zumindest für jetzt und ich bin mit Lift-json Bibliothek de Daten zu analysieren. Mein Programm sieht wie folgt aus:

aber es wirft mich die Ausnahme unter:

java.lang.NoClassDefFoundError: scala/reflect/ClassManifest 
    at net.liftweb.json.JsonAST$JValue.extract(JsonAST.scala:300) 
    at aero.catec.stratio.ScalaExample$.parser(ScalaExample.scala:33) 
    at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48) 
    at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003) 
    at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) 
    at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:575) 
    at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:560) 
Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:372) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:360) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 

Die Sache ist die, dass, wenn ein Lauf das gleiche, ohne Funken mit (Lesen aus einer Datei) es funktioniert perfekt. Das Problem beginnt, wenn ich versuche, es in ein Funkenprogramm zu bringen. Auch wenn ich die Parser-Funktion in etwa so ändere:

def parser(json: String): JValue = { 
    val parsedJson = parse(json) 
    return (parsedJson \\ "ident") 
} 

Es funktioniert auch. Aber wenn ich versuche, den eigentlichen String zu extrahieren, bekomme ich den gleichen Fehler.

Vielen Dank für Ihre Hilfe. Ich hoffe ich habe es gut erklärt.

+1

Es ist wahrscheinlich ein Konflikt in der Scala-Version, die Sie verwenden. –

+0

Kann ich annehmen, dass "paso1.extract [PlaneInfo]" geparstJson.extract [PlaneInfo] sein sollte? – Gillespie

Antwort

2

Dies passiert, weil Sie eine Scala Reflect Abhängigkeit fehlen, die zum Serialisieren/Deserialisieren des Datensatzes benötigt wird. Versuchen Sie, das scala reflect-Glas hinzuzufügen, das der Spark-Version entspricht.

Tipp: "org.scala-lang" % "scala-widerspiegeln" % Version.scala

0

Oh, eine gute alte Frage :-)

Grundsätzlich deutet dies auf eine Version Problem: Einer Ihre Abhängigkeiten sind nicht mit dem Scala-Compiler kompatibel, den Sie gerade verwenden. Bist du auf 2,10?

Versuchen Googeln der Ausdruck "NoClassDefFoundError: Scala/reflektieren/ClassManifest", ich bin sicher, Sie finden mehr als genug Beschreibung über das Problem.