2016-06-07 9 views
0

Um ein paar ML-Algorithmen auszuführen, muss ich zusätzliche Datenspalten erstellen. Jede dieser Spalten enthält einige ziemlich intensive Berechnungen, bei denen es darum geht, gleitende Durchschnitte zu behalten und Informationen aufzuzeichnen, während Sie durch jede Zeile gehen (und sie währenddessen aktualisieren). Ich habe ein simples Python-Skript durchgespielt und es funktioniert, und ich suche derzeit, es in ein Scala Spark-Skript zu übersetzen, das auf einem größeren Datensatz ausgeführt werden könnte.Ist es ineffizient, Spark SQL-Datenrahmen manuell zu iterieren und Spaltenwerte zu erstellen?

Das Problem ist, dass es scheint, dass für diese hoch effizient, mit Spark SQL, ist es bevorzugt, die eingebaute Syntax und Operationen (die SQL-like sind) zu verwenden. Die Codierung der Logik in einem SQL-Ausdruck scheint ein sehr gedankenintensiver Prozess zu sein. Daher frage ich mich, was die Schattenseiten sind, wenn ich die neuen Spaltenwerte manuell durch Wiederholung jeder Zeile, Verfolgung der Variablen und Einfügen der Spalte erstelle Wert am Ende.

+2

Diese Frage ist zu breit zu beantworten. Bitte überprüfen Sie Ihre Frage mit genauen Angaben zu dem, was Sie versucht haben, und auch eine [MVCE] (http://stackoverflow.com/help/mcve) – eliasah

+0

"Jede dieser Spalten beinhaltet einige ziemlich intensive Berechnungen, die das Halten von gleitenden Durchschnitten und das Aufzeichnen von Informationen als beinhaltet Sie gehen durch jede Zeile (und aktualisieren sie in der Zwischenzeit) "- Ihre Berechnungen erfordern einen statusübergreifenden Vorgang. Wie funktioniert das in einer verteilten Umgebung, in der sich Ihre Daten in mehreren Partitionen befinden? Wenn Sie keinen globalen Status benötigen, wie lautet die Definition des "Fensters" für die Verwaltung des Status (im Sinne von SQL-Fensterfunktionen)? – Sim

Antwort

0

Sie können eine RDD in einen Dataframe konvertieren. Verwenden Sie dann die Karte im Datenrahmen und bearbeiten Sie jede Zeile nach Ihren Wünschen. Wenn Sie eine neue Spalte hinzufügen müssen, können Sie mitColumn verwenden. Dies erlaubt jedoch nur eine Spalte hinzuzufügen, und es passiert für den gesamten Datenrahmen. Wenn Sie weitere Spalten hinzufügen möchten, verwenden Sie die interne Kartenmethode

a. Sie können neue Werte basierend auf den Berechnungen sammeln

b. Hinzufügen, diese neue Spaltenwerte Haupt RDD wie unten

val newColumns: Seq[Any] = Seq(newcol1,newcol2) 
Row.fromSeq(row.toSeq.init ++ newColumns) 

Hier Zeile ist die Referenzreihe von in der Karte Methode

c. Erstellen Sie ein neues Schema wie folgt:

val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType)) 

d. Zum alten Schema hinzufügen

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType) 

e. Erstellen Sie einen neuen Datenrahmen mit neuen Spalten

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema) 
+0

Die Sache ist, ich bin mir nicht sicher, ob die SQL-Abfrage oder die Kartenfunktion sehr effizient sein wird. Ich benutze "dynamische Programmierung" -ish-Stil, um wiederkehrende Variablen zu behalten, die verwendet werden, um die Traversierung effizienter zu machen. –

+0

DataFrames API ist derjenige mit einer guten Leistungsverbesserung. Spark SQL ist mehr für die Verwendung von Spark für SQL- und HIVE-Abfragen. Definitiv gibt es darunter einige Optimierungen, da die DataFrames unten auf dem Schema arbeiten. – Ramzy