2016-05-20 11 views
0

Ich mache meine ersten Schritte mit Scala. Ich arbeite mit Cloudera VM.Apache Spark: wie man einen Zähler erstellt

Ich habe eine CSV getrennt mit;. Ich möchte jede Zeile teilen und eine Val-Darlehen mit einem sequentiellen Zähler erstellen. Ich habe eine Karte geschrieben, um diese Aufgaben zu erledigen.

Mein Code ist unten

scala> val loans: RDD[(VertexId, ComplaintNodeDate)] = 
| sc.textFile("/home/cloudera/complaints_loan.csv").filter(!_.startsWith("DateReceived")). 
| map {line => 
| val row = line split ';' 
| var initialValue1 = initialValue2 + 1L 
| initialValue2 = initialValue1 
| (initialValue2, ComplaintLoan(row(0),row(1), row(2), row(3), row(4))) 
| } 

ich folgende Fehler

java.io.IOException: Failed to create local dir in /tmp/spark-3940587c-c7b4-460c-be02-02660ed17f05/blockmgr-d5286d12-401a-4d68-b8b9-7654d319800d/21. 
at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:73) 
at org.apache.spark.storage.DiskStore.contains(DiskStore.scala:167) 
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:404) 
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:805) 
at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) 
at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:991) 
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98) 
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84) 
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) 
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) 
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) 
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051) 
at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:761) 
at org.apache.spark.SparkContext.textFile(SparkContext.scala:589) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:56) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:58) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:62) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72) 
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74) 
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:76) 
at $iwC$$iwC$$iwC.<init>(<console>:78) 
at $iwC$$iwC.<init>(<console>:80) 
at $iwC.<init>(<console>:82) 
at <init>(<console>:84) 
at .<init>(<console>:88) 
at .<clinit>(<console>) 
at .<init>(<console>:7) 
at .<clinit>(<console>) 
at $print(<console>) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) 
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856) 
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) 
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:874) 
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) 
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:874) 
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) 
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:874) 
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) 
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:874) 
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) 
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:874) 
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) 
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:874) 
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) 
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:874) 
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) 
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813) 
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656) 
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664) 
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669) 
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996) 
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) 
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) 
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944) 
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058) 
at org.apache.spark.repl.Main$.main(Main.scala:31) 
at org.apache.spark.repl.Main.main(Main.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) 
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) 
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 


scala> 

Ist es richtig, meinen Code erhalten?

Antwort

0

Nicht sicher, ob es möglich ist, einen Zähler durch lokale Variable zu pflegen, aber wenn Sie eine ID an Ihren Datensatz anhängen möchten. Sie können eins separat erstellen, es mit Ihrer loans Datei zippen und es dann in etwas umwandeln, das Sie wollen.

val numOfPartitions = 8 
val loans = sc.textFile("/home/cloudera/complaints_loan.csv", numOfPartitions). 
       filter(!_.startsWith("DateReceived")) 

val loansWithId = sc.parallelize(1 to loans.count.toInt, numOfPartitions). 
        zip(loans).map{ line => 
        val row = line._2.split(';') 
        (line._1, ComplaintLoan(row(0),row(1), row(2), row(3), row(4))) 

Hoffe es hilft!

+0

Vielen Dank. Ich habe Ihren Code überprüft. Aber ich bekomme die folgende Nachricht, wenn ich creativesWithID erstellen org.apache.hadoop.mapred.InvalidInputException: Eingabepfad existiert nicht: hdfs: //quickstart.cloudera: 8020/home/cloudera/beschwerden_loan.csv \t um Könnten Sie hilf mir, den Fehler zu beheben ?. Ich arbeite mit Cloudera V – user3712581

+0

Wie der Fehler vermuten lässt, findet es die Beschwerdedatei nicht. Probieren Sie diesen Befehl 'hadoop -fs -put localPathTo/claiments_loan.csv/home/cloudera' aus, bevor Sie Spark-shell starten oder spark-submit ausführen. – Psidom

1

1) Sie haben spark-shell und spark-submit im Namen des Benutzers spark in Cloudera auszuführen:

sudo -u spark spark-shell 

2) standardmäßig Funke liest Daten in Datenrahmen von HDFS, so müssen Sie es zuerst laden (mit hdfs-Befehlszeilentool oder Hue-Datei-Browser). Wenn Sie Daten, die aus Fahrern lokalen Dateisystem wollen, geben Sie explizites Protokoll file:///home/data/file.txt

3) Verwenden Sie zipWithIndex auf RDD Zeilen erhalten nummerieren:

loans.zipWithIndex()