2016-04-26 4 views
3

Ich übe in der Spark-Shell nach Arten. Ich habe eine rdd mit etwa 10 Spalten/Variablen. Ich mag die ganze rdd auf den Werten der Spalte 7.Wie spalte ich ein Spark rdd Array [(String, Array [String])]?

rdd 
org.apache.spark.rdd.RDD[Array[String]] = ... 

Von sortieren, was ich sammle die Art und Weise zu tun, ist durch die Verwendung sortByKey, die wiederum nur auf Paare funktionieren. So kartiert ich es so würde ich ein Paar hat, bestehend aus column7 (String-Werten) und das vollständige Original rdd (Array von Strings)

rdd2 = rdd.map(c => (c(7),c)) 
rdd2: org.apache.spark.rdd.RDD[(String, Array[String])] = ... 

ich dann sortByKey gelte nach wie vor kein Problem ...

rdd3 = rdd2.sortByKey() 
rdd3: org.apache.spark.rdd.RDD[(String, Array[String])] = ... 

Aber jetzt Wie spalte ich ab, sammle und speichere das sortierte Original rdd von rdd3 (Array [String])? Immer wenn ich einen Split auf rdd3 versuche, gibt es mir einen Fehler:

val rdd4 = rdd3.map(_.split(',')(2)) 
<console>:33: error: value split is not a member of (String, Array[String]) 

Was mache ich hier falsch? Gibt es andere, bessere Möglichkeiten, eine RDD auf einer ihrer Spalten zu sortieren?

+1

ich nicht bekommen Sie genau wollen. meinst du, dass du jeden String innerhalb von Array [String] teilen willst? – jtitusj

+2

Sie haben versucht, Tuple zu teilen, deshalb ist es der Fehler –

+1

@John Nein Ich möchte rdd3 (ein sortiertes Paar von Spalte7 und die ursprüngliche rdd) teilen, also würde ich meine ursprüngliche rdd zurück haben, aber immer noch in Spalte 7 sortiert ... ohne tatsächlich die vorangestellte Spalte 7 (wie in rdd3). Ich habe die Frage leicht bearbeitet, ist es jetzt klarer? –

Antwort

2

, was Sie mit rdd2 = rdd.map(c => (c(7),c)) tat Karte es zu einem Tupel. rdd2: org.apache.spark.rdd.RDD[(String, Array[String])] genau wie es sagt :). Jetzt, wenn Sie den Datensatz teilen möchten, müssen Sie es aus diesem Tupel erhalten. Sie können wieder abzubilden, nur den zweiten Teil des Tupels nehmen (das ist die Anordnung von Array [String] ...) wie folgt: rdd3.map(_._2)

aber ich würde stark schlagen Gebrauch rdd.sortBy(_(7)) oder etwas dieser Art versuchen . Auf diese Weise müssen Sie sich nicht mit Tupel und dergleichen beschäftigen.

+0

Ich habe Ihren Vorschlag von rdd.sortBy (_. => _. 7) versucht, aber das löscht "error: identifier expected but '=>' found". Kannst du das bearbeiten, damit ich deine Antwort annehmen kann? Wie Sie vorschlagen, erledigt rdd3.map (_._ 2) die Aufgabe ebenfalls, erfordert aber etwas mehr Arbeit. –

+0

'.sortBy (c => c._7, false)' sollte funktionieren ....... halte das '_' (Unterstrich ...) –

+0

' .sortBy (c => c._7) 'won ' t funktioniert genauso gut wie '.sortBy (_._ 7)', da die Elemente in der rdd eine Array-Struktur haben. @KoenDeCouck, ich habe meine Antwort gepostet. Vielleicht möchten Sie es überprüfen. :) – jtitusj

0

Ich dachte, Sie mit Scala nicht vertraut machen, So unten sollten Ihnen helfen, mehr zu verstehen,

rdd3.map(kv => { 
    println(kv._1) // This represent String 
    println(kv._2) // This represent Array[String] 
}) 
1

dies nur tun:

val rdd4 = rdd3.map(_._2) 
2

, wenn Sie die rdd mit der 7. Zeichenfolge im Array sortieren möchten, können Sie es nur durch direkt tun

rdd.sortBy(_(6)) // array starts at 0 not 1 

oder

rdd.sortBy(arr => arr(6)) 

, dass Sie all die Probleme zu tun, sparen mehrere Transformationen. Der Grund, warum rdd.sortBy(_._7) oder rdd.sortBy(x => x._7) nicht funktioniert, liegt daran, dass Sie nicht auf ein Element in einem Array zugreifen. Um auf das 7. Element eines Arrays zuzugreifen, sagen Sie arr, sollten Sie arr(6) tun.

Um dies zu testen, habe ich folgendes:

val rdd = sc.parallelize(Array(Array("ard", "bas", "wer"), Array("csg", "dip", "hwd"), Array("asg", "qtw", "hasd"))) 

// I want to sort it using the 3rd String 
val sorted_rdd = rdd.sortBy(_(2)) 

Hier ist das Ergebnis:

Array(Array("ard", "bas", "wer"), Array("csg", "dip", "hwd"), Array("asg", "qtw", "hasd")) 
+0

Danke John! Diese Lösung sieht nach der besseren Sortiermethode aus. Ich akzeptiere Zahiros Antwort jedoch wegen der Art und Weise, wie die Frage formuliert wurde, mit Ihrer Lösung. (Upvoted this) –

+0

Ich uploated auch .. :) –