2015-11-16 4 views
10

Ich bin ein Anfänger zu entfachen, und ich möchte verwandeln unter Quelle Datenrahmen (Last von JSON-Datei):Spark-Datenrahmen mehrere Zeilen Spalte Transformation

+--+-----+-----+ 
|A |count|major| 
+--+-----+-----+ 
| a| 1| m1| 
| a| 1| m2| 
| a| 2| m3| 
| a| 3| m4| 
| b| 4| m1| 
| b| 1| m2| 
| b| 2| m3| 
| c| 3| m1| 
| c| 4| m3| 
| c| 5| m4| 
| d| 6| m1| 
| d| 1| m2| 
| d| 2| m3| 
| d| 3| m4| 
| d| 4| m5| 
| e| 4| m1| 
| e| 5| m2| 
| e| 1| m3| 
| e| 1| m4| 
| e| 1| m5| 
+--+-----+-----+ 

In unter Ergebnis Datenrahmen:

+--+--+--+--+--+--+ 
|A |m1|m2|m3|m4|m5| 
+--+--+--+--+--+--+ 
| a| 1| 1| 2| 3| 0| 
| b| 4| 2| 1| 0| 0| 
| c| 3| 0| 4| 5| 0| 
| d| 6| 1| 2| 3| 4| 
| e| 4| 5| 1| 1| 1| 
+--+--+--+--+--+--+ 

Hier ist die Transformation Regel:

  1. Das Ergebnis Datenrahmen wird Namen mit A + (n major columns) wo die major Spalten bestand von angegeben sind:

    sorted(src_df.map(lambda x: x[2]).distinct().collect()) 
    
  2. Das Ergebnis Datenrahmen m Zeilen enthält, wobei die Werte für A Spalte vorgesehen sind:

    sorted(src_df.map(lambda x: x[0]).distinct().collect()) 
    
  3. Der Wert für jeden Major Spalte im Ergebnisdatenrahmen ist der Wert von dem Quelldatenrahmen auf dem entsprechenden A und Haupt (z.B. die Zählung in Zeile 1 in Quelle Datenrahmen wird die box abgebildet, wo Aa und Spalte m1)

  4. Die Kombinationen von A und major in Quelle Datenrahmen ist keine Duplizierung (bitte einen Primärschlüssel auf den beiden Säulen betrachten in SQL)

+0

Ich versuche, von ein paar Daten zu holen mit dem genauen Format aus der Tabelle vorhandenen Datenbank, wie jede Zeile pro ‚A‘ pro ‚major‘ ein Merkmal darstellt, zu einem 'A', also muss ich den Datenrahmen in das Format umwandeln, das ich gepostet habe, also kann ich die Daten in die ALS Funkenberechnung portieren. – resec

+0

Lassen Sie mich wissen, wenn ich den Funken falsch benutze und richtig diese Datentransformation außerhalb des Funkens machen soll. – resec

Antwort

8

Lets mit Beispieldaten starten:

df = sqlContext.createDataFrame([ 
    ("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"), 
    ("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"), 
    ("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"), 
    ("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"), 
    ("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"), 
    ("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"), 
    ("e", 1, "m4"), ("e", 1, "m5")], 
    ("a", "cnt", "major")) 

Bitte beachten Sie, dass esgeändert habebis cnt. Count ist in den meisten SQL-Dialekten ein reserviertes Schlüsselwort und für einen Spaltennamen keine gute Wahl.

Es gibt mindestens zwei Möglichkeiten, diese Daten neu zu gestalten:

  • Datenrahmen

    from pyspark.sql.functions import col, when, max 
    
    majors = sorted(df.select("major") 
        .distinct() 
        .map(lambda row: row[0]) 
        .collect()) 
    
    cols = [when(col("major") == m, col("cnt")).otherwise(None).alias(m) 
        for m in majors] 
    maxs = [max(col(m)).alias(m) for m in majors] 
    
    reshaped1 = (df 
        .select(col("a"), *cols) 
        .groupBy("a") 
        .agg(*maxs) 
        .na.fill(0)) 
    
    reshaped1.show() 
    
    ## +---+---+---+---+---+---+ 
    ## | a| m1| m2| m3| m4| m5| 
    ## +---+---+---+---+---+---+ 
    ## | a| 1| 1| 2| 3| 0| 
    ## | b| 4| 1| 2| 0| 0| 
    ## | c| 3| 0| 4| 5| 0| 
    ## | d| 6| 1| 2| 3| 4| 
    ## | e| 4| 5| 1| 1| 1| 
    ## +---+---+---+---+---+---+ 
    
  • groupBy über RDD

    from pyspark.sql import Row 
    
    grouped = (df 
        .map(lambda row: (row.a, (row.major, row.cnt))) 
        .groupByKey()) 
    
    def make_row(kv): 
        k, vs = kv 
        tmp = dict(list(vs) + [("a", k)]) 
        return Row(**{k: tmp.get(k, 0) for k in ["a"] + majors}) 
    
    reshaped2 = sqlContext.createDataFrame(grouped.map(make_row)) 
    
    reshaped2.show() 
    
    ## +---+---+---+---+---+---+ 
    ## | a| m1| m2| m3| m4| m5| 
    ## +---+---+---+---+---+---+ 
    ## | a| 1| 1| 2| 3| 0| 
    ## | e| 4| 5| 1| 1| 1| 
    ## | c| 3| 0| 4| 5| 0| 
    ## | b| 4| 1| 2| 0| 0| 
    ## | d| 6| 1| 2| 3| 4| 
    ## +---+---+---+---+---+---+ 
    
+0

Aus Neugierde, was bedeutet 'shaped1 = (df.select (col (" a "), * cols) .groupBy (" a ") .agg (* maxs) .na.fill (0))' stehen für in Scala? Ich habe Probleme, die * Spalten und * maxs zu verstehen. – eliasah

+1

@eliasah Es ist Argument Entpacken und ist mehr oder weniger gleichbedeutend mit so etwas wie 'cols: Seq [Column] = ???; df.select (cols: _ *) ' – zero323

+0

Ok danke! Aber wie wird dann das agg (* max) konvertiert? da agg einen Ausdruck nimmt. – eliasah

3

zero323 die Verwendung von Aggregation über Datenrahmen,

df = sqlContext.createDataFrame([ 
("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"), 
("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"), 
("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"), 
("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"), 
("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"), 
("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"), 
("e", 1, "m4"), ("e", 1, "m5")], 
("a", "cnt", "major")) 

Sie könnten auch

verwenden
reshaped_df = df.groupby('a').pivot('major').max('cnt').fillna(0) 
+0

Dies ist eine ausgezeichnete Lösung. Vielen Dank! Hier ist die Scala-Version, falls benötigt: val reshaped_df = transposed_res.groupBy ("a"). Pivot ("major"). Max ("cnt"). Na.fill (0) – Hako