2016-04-04 5 views
0

generieren Ich studiere Funken auf virtualbox. Ich benutze ./bin/spark-shell, um Funken zu öffnen und scala zu benutzen. Jetzt war ich verwirrt über das Schlüsselwertformat mit Hilfe von Scala.Wie Schlüssel-Wert-Format mit Hilfe von Scala in Funken

Ich habe eine txt-Datei in home/Feng/Funken/Daten, die wie folgt aussieht:

panda 0 
pink 3 
pirate 3 
panda 1 
pink 4 

ich sc.textFile verwenden diese txt-Datei zu erhalten. Wenn ich

val rdd = sc.textFile("/home/feng/spark/data/rdd4.7") 

tun Dann kann ich rdd.collect() verwenden rdd auf dem Bildschirm zu zeigen:

scala> rdd.collect() 
res26: Array[String] = Array(panda 0, pink 3, pirate 3, panda 1, pink 4) 

Wenn ich jedoch

val rdd = sc.textFile("/home/feng/spark/data/rdd4.7.txt") 

die kein“.txt tun " Hier. Dann, wenn ich rdd.collect() verwende, habe ich einen Fehler:

org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/feng/spark/A.txt 
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285) 
...... 

Aber ich sah andere Beispiele. Alle haben am Ende ".txt". Stimmt etwas nicht mit meinem Code oder meinem System?

Eine andere Sache ist, wenn ich zu tun versucht:

scala> val rddd = rdd.map(x => (x.split(" ")(0),x)) 
rddd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at <console>:29 
scala> rddd.collect() 
res0: Array[(String, String)] = Array((panda,panda 0), (pink,pink 3), (pirate,pirate 3), (panda,panda 1), (pink,pink 4)) 

ich soll die erste Spalte der Daten auszuwählen und zu verwenden es als Schlüssel

. Aber rddd.collect() sieht nicht so aus, da die Wörter zweimal vorkommen, was nicht richtig ist. Ich kann die restlichen Operationen wie mapbykey, reduebykey oder andere nicht fortsetzen. Wo habe ich falsch gemacht?

Jede Hilfe wird wirklich geschätzt.

+0

Ihre Frage scheint mit Ihrer Verwendung von ".txt" etwas inkonsistent zu sein. Können Sie Ihren Text und Ihre Code-Einfügungen überprüfen, um sicherzustellen, dass alles korrekt ist? Wenn dies der Fall ist, scheint Ihr System wirklich durcheinander zu sein. – Phasmid

Antwort

1

Nur zum Beispiel erstelle ich ein String mit Ihrem Dataset, danach ich die Platte durch eine Linie geteilt, und verwenden Sie SparkContext ‚s parallelize Methode ein RDD zu erstellen. Beachten Sie, dass ich nach der Erstellung der die Methode map verwende, um die in jedem Datensatz gespeicherte String zu teilen und in eine zu konvertieren.

import org.apache.spark.sql.Row 
val text = "panda 0\npink 3\npirate 3\npanda 1\npink 4" 

val rdd = sc.parallelize(text.split("\n")).map(x => Row(x.split(" "):_*)) 
rdd.take(3) 

Die Ausgabe aus dem take Methode ist:

res4: Array[org.apache.spark.sql.Row] = Array([panda,0], [pink,3], [pirate,3]) 

Über Ihre erste Frage, gibt es keine Notwendigkeit für Dateien eine beliebige Erweiterung haben. Denn in diesem Fall werden Dateien als reiner Text betrachtet.