2015-02-09 3 views
5

Wir haben hive Lager und wollten Funken für verschiedene Aufgaben verwendet wird (vor allem Klassifikation). Schreiben Sie die Ergebnisse manchmal als Strukturtabelle zurück. Zum Beispiel haben wir die folgende Python-Funktion geschrieben, um die Gesamtsumme von originally_table column two, gruppiert nach original_table column one, zu finden. Die Funktion funktioniert, aber wir befürchten, dass sie ineffizient ist, insbesondere die zu konvertierenden Schlüssel/Wert-Paare und die Wörterbuchversionen. Funktionen combiner, mergeValue, mergeCombiner sind an anderer Stelle definiert, funktionieren aber gut.zu lesen und von hive Tabellen mit Funken nach der Aggregation Schreiben

from pyspark import HiveContext 

rdd = HiveContext(sc).sql('from original_table select *') 

#convert to key-value pairs 
key_value_rdd = rdd.map(lambda x: (x[0], int(x[1]))) 

#create rdd where rows are (key, (sum, count) 
combined = key_value_rdd.combineByKey(combiner, mergeValue, mergeCombiner) 

# creates rdd with dictionary values in order to create schemardd 
dict_rdd = combined.map(lambda x: {'k1': x[0], 'v1': x[1][0], 'v2': x[1][1]}) 

# infer the schema 
schema_rdd = HiveContext(sc).inferSchema(dict_rdd) 

# save 
schema_rdd.saveAsTable('new_table_name') 

Gibt es effizientere Möglichkeiten, dasselbe zu tun?

+1

nicht sicher, warum Sie in eine rdd konvertieren müssen, aber wenn Sie darauf bestehen, können Sie einfach 'key_value_rdd.reduceByKey (Lambda x, y: Summe (x, y))' anstelle von 'combineByKey'. – mtoto

Antwort

0

... vielleicht war dies nicht möglich, wenn die Frage geschrieben wurde, aber es macht keinen Sinn, jetzt (Beitrag 1.3) die createDataFrame() aufrufen benutzen?

Nach dem ersten RDD bekommt, sieht es aus wie Sie den Anruf machen könnten, dann eine einfache SQL-Anweisung für die Struktur laufen die ganze Arbeit in einem Arbeitsgang zu erledigen. (Summe und Gruppierung) Außerdem kann die DataFrame-Struktur das Schema direkt nach der Erstellung ableiten, wenn ich das API-Dokument korrekt lese.

(http://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html#pyspark.sql.HiveContext)

0

Dieser Fehler kann durch Einstellung hive.exec.scratchdir zu dem Ordner, gelöst in dem Benutzer Zugriff hat

+1

Dies sollte Kommentar sein, denke ich. – ketan

+0

über welchen Fehler redest du? – mtoto

0

Welche Version von Funken Sie verwenden?

Diese Antwort bezieht sich auf 1.6 & unter Verwendung der Datenrahmen.

val sc = new SparkContext(conf) 
val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

import sqlContext.implicits._ 
val client = Seq((1, "A", 10), (2, "A", 5), (3, "B", 56)).toDF("ID", "Categ", "Amnt") 

    import org.apache.spark.sql.functions._ 
    client.groupBy("Categ").agg(sum("Amnt").as("Sum"), count("ID").as("count")).show() 


+-----+---+-----+ 
|Categ|Sum|count| 
+-----+---+-----+ 
| A| 15| 2| 
| B| 56| 1| 
+-----+---+-----+ 

Hoffe, das hilft !!