2016-06-28 7 views
-1

Ich habe einige XML-Dateien & Ich habe diese in einem Kafka-Thema & Ich habe Dstream-Objekt des Kafka-Themas erstellt. Ich kann nicht weiter fortfahren, da ich die XML-Daten vom Thema analysieren möchte. Bitte, wenn jemand, der an der XML-Verarbeitung im Spark-Streaming gearbeitet hat, seine Eingaben für mich machen kann, um fortzufahren. Ich stehe damit aus den letzten 2 Tagen fest.Kafka Spark Streaming XML Parsing/Verarbeitung

Der Ansatz, den ich nehme ist XML-Dateien -> Kafka-Thema -> Verarbeitung im Spark-Streaming -> wieder in Kafka setzen.

Ich bin in der Lage, Daten zurück in Kafka Thema, aber nicht in der Lage, etwas mit den Daten aus dem Thema in Spark-Streaming zu tun.

+0

Könnten Sie Ihren Code hinzufügen und spezifisch sein, was das Problem ist und welche Fehler oder Ausnahmen erhalten Sie? – maasg

+0

@Harsha bin in ein gleiches Problem beim Lesen der Nachrichten von Kafka läuft bekommen die Nachrichten als jedes Tag als eine Nachricht. Könnten Sie mir bitte mitteilen, wie Sie das Problem gelöst haben? –

+0

@ankush reddy JAXB verwenden, um die XMLs mit ihren jeweiligen Schemas zu validieren – Harsha

Antwort

0

Welche Verarbeitung erwarten Sie?

Wenn Sie irgendeine Art von Datenextraktion erwarten, was Sie tun können, ist für jede Nachricht, konvertieren Sie sie in JSON (XML zu JSON ist sehr einfach) und erhalten Sie die JsonRDD und JsonRDD zu DF ist direkte Umwandlung. So können Sie eine Auswahl oder andere Operationen auf dem Datenrahmen ausführen.

Ich brauche einige Eingaben von Ihnen, um eine exakte Lösung zu bieten

1) Was Sie wollen aus den Daten.? 2) Ist der Datenrahmen aus den Daten ausreichend?

Wenn Sie mit der Eingabe erklären können, wäre das sehr hilfreich.

Ich habe einen Beispielcode hinzugefügt, um den Datenrahmen aus den XML-Daten zu erhalten.

val jsonStream = kafkaStream.transform(
     y => { 
     y.filter(x => x._1 != null && x._2 != null).map(x => { 
      XML.toJSONObject(x).toString(4); 
     } 
     ) 
     }) 


jsonStream.foreachRDD(x => { 
     val sqlContext = SQLContextSingleton.getInstance(x.sparkContext) 
     if (x != null) { 

     val df = sqlContext.read.json(x) 
     // Your DF Operations 
     } 
     } 
    } 
) 

object SQLContextSingleton { 

    @transient private var instance: HiveContext = _ 

    def getInstance(sparkContext: SparkContext): HiveContext = { 
    if (instance == null) { 
     sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false"); 
     sparkContext.hadoopConfiguration.set("spark.sql.parquet.mergeSchema", "true"); 
     sparkContext.hadoopConfiguration.set("spark.sql.parquet.cacheMetadata","false"); 
     instance = new HiveContext(sparkContext) 
    } 
    instance 
    } 
} 
+0

Hallo Srini, danke für deine schnelle Antwort. Das Problem wurde gelöst, es war ein sehr komplexer Anwendungsfall, bei dem wir 3 Arten von XMLs mit Spark-Streaming verknüpfen wollten. Endlich ist es geschafft. Wir haben JAXB verwendet, um die XMLs mit ihren jeweiligen Schemas zu validieren. Wie gesagt, der Anwendungsfall ist ziemlich komplex, es gibt viele Codierungen, daher teile ich keinen Code, der für mich funktioniert hat. Vielen Dank noch mal. – Harsha