2014-11-15 2 views
7

Ich benutze hadoop 2.4.1 und Spark 1.1.0. Ich habe einen Datensatz von Lebensmitteln Beitrag zu HDFS von here hochgeladen und dann habe ich den folgenden Code die Datei und verarbeiten es auf der Funken Shell zu lesen:Warum wirft Spark-Shell ArrayIndexOutOfBoundsException beim Lesen einer großen Datei aus HDFS?

import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.mapreduce.Job 
import org.apache.hadoop.io.{LongWritable, Text} 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat 

var path = "hdfs:///user/hduser/finefoods.txt" 
val conf = new Configuration 
conf.set("textinputformat.record.delimiter", "\n\n") 
var dataset = sc.newAPIHadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString) 
var datasetObj = dataset.map{ rowStr => rowStr.split("\n")}  
var tupleSet = datasetObj.map(strArr => strArr.map(elm => elm.split(": ")(1))).map(arr => (arr(0),arr(1),arr(4).toDouble)) 
tupleSet.groupBy(t => t._2) 

Als ich die letzte Zeile tupleSet.groupBy(t => t._2) laufen, die Funken Schale wirft die folgende Ausnahme:

scala> tupleSet.groupBy(t => t._2).first() 
14/11/15 22:46:59 INFO spark.SparkContext: Starting job: first at <console>:28 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Registering RDD 11 (groupBy at <console>:28) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Got job 1 (first at <console>:28) with 1 output partitions (allowLocal=true) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Final stage: Stage 1(first at <console>:28) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 2) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Missing parents: List(Stage 2) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Submitting Stage 2 (MappedRDD[11] at groupBy at <console>:28), which has no missing parents 
14/11/15 22:46:59 INFO storage.MemoryStore: ensureFreeSpace(3592) called with curMem=221261, maxMem=278302556 
14/11/15 22:46:59 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.5 KB, free 265.2 MB) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from Stage 2 (MappedRDD[11] at groupBy at <console>:28) 
14/11/15 22:46:59 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 3 tasks 
14/11/15 22:46:59 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 3, localhost, ANY, 1221 bytes) 
14/11/15 22:46:59 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 2.0 (TID 4, localhost, ANY, 1221 bytes) 
14/11/15 22:46:59 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 3) 
14/11/15 22:46:59 INFO executor.Executor: Running task 1.0 in stage 2.0 (TID 4) 
14/11/15 22:46:59 INFO rdd.NewHadoopRDD: Input split: hdfs://10.12.0.245/user/hduser/finefoods.txt:0+134217728 
14/11/15 22:46:59 INFO rdd.NewHadoopRDD: Input split: hdfs://10.12.0.245/user/hduser/finefoods.txt:134217728+134217728 
14/11/15 22:47:02 ERROR executor.Executor: Exception in task 1.0 in stage 2.0 (TID 4) 
java.lang.ArrayIndexOutOfBoundsException 
14/11/15 22:47:02 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 2.0 (TID 5, localhost, ANY, 1221 bytes) 
14/11/15 22:47:02 INFO executor.Executor: Running task 2.0 in stage 2.0 (TID 5) 
14/11/15 22:47:02 INFO rdd.NewHadoopRDD: Input split: hdfs://10.12.0.245/user/hduser/finefoods.txt:268435456+102361028 
14/11/15 22:47:02 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 4, localhost): java.lang.ArrayIndexOutOfBoundsException: 

14/11/15 22:47:02 ERROR scheduler.TaskSetManager: Task 1 in stage 2.0 failed 1 times; aborting job 
14/11/15 22:47:02 INFO scheduler.TaskSchedulerImpl: Cancelling stage 2 
14/11/15 22:47:02 INFO scheduler.TaskSchedulerImpl: Stage 2 was cancelled 
14/11/15 22:47:02 INFO executor.Executor: Executor is trying to kill task 2.0 in stage 2.0 (TID 5) 
14/11/15 22:47:02 INFO executor.Executor: Executor is trying to kill task 0.0 in stage 2.0 (TID 3) 
14/11/15 22:47:02 INFO scheduler.DAGScheduler: Failed to run first at <console>:28 
14/11/15 22:47:02 INFO executor.Executor: Executor killed task 0.0 in stage 2.0 (TID 3) 
14/11/15 22:47:02 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 3, localhost): TaskKilled (killed intentionally) 
14/11/15 22:47:02 INFO executor.Executor: Executor killed task 2.0 in stage 2.0 (TID 5) 
14/11/15 22:47:02 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 2.0 (TID 5, localhost): TaskKilled (killed intentionally) 
14/11/15 22:47:02 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 4, localhost): java.lang.ArrayIndexOutOfBoundsException: 

Driver stacktrace: 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) 
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
     at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
     at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

Aber wenn ich Dummy-Dataset wie die folgenden verwenden, es funktioniert gut:

var tupleSet = sc.parallelize(List(
("B001E4KFG0","A3SGXH7AUHU8GW",3.0), 
("B001E4KFG1","A3SGXH7AUHU8GW",4.0), 
("B001E4KFG2","A3SGXH7AUHU8GW",4.0), 
("B001E4KFG3","A3SGXH7AUHU8GW",4.0), 
("B001E4KFG4","A3SGXH7AUHU8GW",5.0), 
("B001E4KFG5","A3SGXH7AUHU8GW",5.0), 
("B001E4KFG0","bbb",5.0) 
)) 

Jede Idee, w hy?

Antwort

9

Es gibt wahrscheinlich einen Eintrag im Dataset, der nicht dem Format folgt und deshalb: elm.split(": ")(1) fehlschlägt, weil es kein Element bei diesem Index gibt.

Sie können diesen Fehler vermeiden, indem Sie die Ergebnisse der Teilung überprüfen, bevor Sie auf den Index (1) zugreifen. Eine Möglichkeit, dies zu tun könnte so etwas wie diese:

var tupleSet = datasetObj.map(elem => elm.split(": ")).collect{case x if (x.length>1) x(1)} 

Eine Anmerkung: Ihre Beispiele scheinen nicht die Analyse-Pipeline in dem Code übereinstimmen. Sie enthalten keine ":" Token.

Da die Transformationen träge sind, wird Spark Ihnen nicht viel über Ihr Eingabe-Dataset erzählen (und Sie werden es vielleicht nicht bemerken), bis eine Aktion wie groupBy() ausgeführt wird.

0

Es könnte auch an leeren/leeren Zeilen in Ihrem Dataset liegen. Und Sie wenden eine Split-Funktion auf die Daten an. Filtern Sie in einem solchen Fall die leeren Zeilen.

ZB: myrdd.filter (. _ NonEmpty) .map (...)

0

Ich hatte ein ähnliches Problem, wenn ich ein Log-Daten in Datenrahmen mit pySpark konvertieren.

Wenn ein Protokolleintrag ungültig ist, habe ich einen Nullwert anstelle einer Zeileninstanz zurückgegeben. Vor dem Konvertieren in den Datenframe habe ich diese Nullwerte herausgefiltert. Aber trotzdem habe ich das obige Problem. Schließlich ging der Fehler weg, als ich eine Zeile mit Nullwerten anstelle eines einzelnen Nullwertes zurückgab.

Pseudo-Code unten:

Didnt Arbeit:

rdd = Parse log (log lines to Rows if valid else None) 
filtered_rdd = rdd.filter(lambda x:x!=None) 
logs = sqlContext.inferSchema(filtered_rdd) 

Arbeitete:

rdd = Parse log (log lines to Rows if valid else Row(None,None,...)) 
logs = sqlContext.inferSchema(rdd) 
filtered_rdd = logs.filter(logs['id'].isNotNull())