2016-04-12 4 views
-1

Entschuldigung für die Verwirrung in der ersten Frage. Hier ist eine Fragen mit dem reproduzierbaren Beispiel:So ersetzen Sie den RDD-Typ von [String] durch Werte vom RDD-Typ [String, Int]

Ich habe eine rdd [String] und ich habe eine RDD [String, Long]. Ich hätte gerne eine RDD von [Long] basierend auf der Übereinstimmung von String von Sekunde mit String von zuerst. Beispiel:

//Create RDD 
val textFile = sc.parallelize(Array("Spark can also be used for compute intensive tasks", 
     "This code estimates pi by throwing darts at a circle")) 
// tokenize, result: RDD[(String)] 
val words = textFile.flatMap(line => line.split(" ")) 
// create index of distinct words, result: RDD[(String,Long)] 
val indexWords = words.distinct().zipWithIndex() 

Als Ergebnis würde Ich mag ein RDD mit Indizes von Wörtern haben statt Worte in "Spark can also be used for compute intensive tasks".

Leider wieder und dank

+0

Ich schlage vor, Sie auf der Karte-Funktion einen Blick. –

+0

Da Sie in jeder RDD viele Arrays haben können, welches Array wird dann verwendet, um das Ergebnis zu erhalten? oder basierend auf Index? – iboss

+0

@iboss Die resultierende rdd wird 'Werte' von' y' basierend auf den passenden 'Schlüssel' mit' x' sein. – aigujin

Antwort

0

ich einen Fehler von SPARK-5063 wurde immer und this answer gegeben, fand ich die Lösung für mein Problem:

//broadcast `indexWords` 
val bcIndexWords = sc.broadcast(indexWords.collectAsMap) 
// select `value` of `indexWords` given `key` 
val result = textFile.map{arr => arr.split(" ").map(elem => bcIndexWords.value(elem))} 
result.first() 
res373: Array[Long] = Array(3, 7, 14, 6, 17, 15, 0, 12) 
0

Wenn ich Sie richtig verstehe, sind Sie in den Indizes von Werken interessiert, die auch in Spark can also be used for compute intensive tasks erscheinen.

Wenn ja - hier sind zwei Versionen mit identischen Ausgänge aber unterschiedliche Leistungsmerkmale:

val lookupWords: Seq[String] = "Spark can also be used for compute intensive tasks".split(" ") 

// option 1 - use join: 
val lookupWordsRdd: RDD[(String, String)] = sc.parallelize(lookupWords).keyBy(w => w) 
val result1: RDD[Long] = indexWords.join(lookupWordsRdd).map { case (key, (index, _)) => index } 

// option 2 - assuming list of lookup words is short, you can use a non-distributed version of it 
val result2: RDD[Long] = indexWords.collect { case (key, index) if lookupWords.contains(key) => index } 

Die erste Option erstellt eine zweite RDD mit den Worten, deren Indizes uns interessiert, verwendet keyBy sie verwandeln sich in ein PairRDD (mit Schlüssel == Wert!), join s es mit Ihrer indexWords RDD und dann Karten, um nur den Index zu erhalten. Die zweite Option sollte nur verwendet werden, wenn bekannt ist, dass die Liste der "interessanten Wörter" nicht zu groß ist - also können wir sie als Liste behalten (und nicht RDD), und Spark serialisieren und an die Arbeiter senden lassen für jede zu verwendende Aufgabe. Wir verwenden dann collect(f: PartialFunction[T, U]), die diese Teilfunktion anwendet, um einen "Filter" und eine "Map" auf einmal zu erhalten - wir geben nur einen Wert zurück, wenn die Wörter in der Liste existieren, und wenn ja - geben wir den Index zurück.

+0

Danke für die Antwort. Ich habe die Option 1 auf meine Daten angewendet und eine rdd dieses Typs erhalten: 'org.apache.spark.rdd.RDD [org.apache.spark.rdd.RDD [Long]]'. Mein Datensatz ist ein 'RDD [String]' und besteht aus Sätzen, also verwende ich 'map', um den Code an jeden Satz zu übergeben. Um "Join" zu machen, benutze ich 'sc.parallelize (x)' innerhalb von 'map'. Am Ende habe ich eine geschachtelte RDDs und ich weiß nicht, wie man zu einem 'String'-Typ kommt. – aigujin

+0

Das wird nicht funktionieren, Sie _cannot_ erstellen 'RDD [RDD [..]]' ', und Sie können nicht' SparkContext' innerhalb einer RDD-Umwandlung (z. B. 'map'), versuchen Sie es nicht einmal. Ich verstehe nicht wirklich, was du zu tun versuchst (sollte jedes einzelne Wort in jedem Satz seinem Index zugeordnet werden? Was ist der gewünschte Ergebnistyp?), Also kann ich keine Alternative vorschlagen, aber normalerweise immer dann, wenn du denkst, dass du es brauchst "geschachtelte RDDs" sollten Sie eigentlich 'RDD.join' oder' RDD.cartesian' verwenden. –

+0

Ihr Recht: Ich möchte, dass jedes Wort in einem Satz seinem Index in jedem Satz zugeordnet wird. Die Auflistung der Indizes ist bereits abgeschlossen und Sie haben gezeigt, wie Sie das Mapping in einem einzigen Satz durchführen können. Nun möchte ich es über 'map' Transformation auf eine rdd anwenden. Die gewünschte Ausgabe ist ein 'rdd [String]' mit ganzen Zahlen (anstelle von Wörtern) für jeden Satz. – aigujin