Ich benutze hadoop 2.4.1 und Spark 1.1.0. Ich habe einen Datensatz von Lebensmitteln Beitrag zu HDFS von here hochgeladen und dann habe ich den folgenden Code die Datei und verarbeiten es auf der Funken Shell zu lesen:Warum wirft Spark-Shell ArrayIndexOutOfBoundsException beim Lesen einer großen Datei aus HDFS?
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
var path = "hdfs:///user/hduser/finefoods.txt"
val conf = new Configuration
conf.set("textinputformat.record.delimiter", "\n\n")
var dataset = sc.newAPIHadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString)
var datasetObj = dataset.map{ rowStr => rowStr.split("\n")}
var tupleSet = datasetObj.map(strArr => strArr.map(elm => elm.split(": ")(1))).map(arr => (arr(0),arr(1),arr(4).toDouble))
tupleSet.groupBy(t => t._2)
Als ich die letzte Zeile tupleSet.groupBy(t => t._2)
laufen, die Funken Schale wirft die folgende Ausnahme:
scala> tupleSet.groupBy(t => t._2).first()
14/11/15 22:46:59 INFO spark.SparkContext: Starting job: first at <console>:28
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Registering RDD 11 (groupBy at <console>:28)
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Got job 1 (first at <console>:28) with 1 output partitions (allowLocal=true)
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Final stage: Stage 1(first at <console>:28)
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 2)
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Missing parents: List(Stage 2)
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Submitting Stage 2 (MappedRDD[11] at groupBy at <console>:28), which has no missing parents
14/11/15 22:46:59 INFO storage.MemoryStore: ensureFreeSpace(3592) called with curMem=221261, maxMem=278302556
14/11/15 22:46:59 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.5 KB, free 265.2 MB)
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from Stage 2 (MappedRDD[11] at groupBy at <console>:28)
14/11/15 22:46:59 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 3 tasks
14/11/15 22:46:59 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 3, localhost, ANY, 1221 bytes)
14/11/15 22:46:59 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 2.0 (TID 4, localhost, ANY, 1221 bytes)
14/11/15 22:46:59 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 3)
14/11/15 22:46:59 INFO executor.Executor: Running task 1.0 in stage 2.0 (TID 4)
14/11/15 22:46:59 INFO rdd.NewHadoopRDD: Input split: hdfs://10.12.0.245/user/hduser/finefoods.txt:0+134217728
14/11/15 22:46:59 INFO rdd.NewHadoopRDD: Input split: hdfs://10.12.0.245/user/hduser/finefoods.txt:134217728+134217728
14/11/15 22:47:02 ERROR executor.Executor: Exception in task 1.0 in stage 2.0 (TID 4)
java.lang.ArrayIndexOutOfBoundsException
14/11/15 22:47:02 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 2.0 (TID 5, localhost, ANY, 1221 bytes)
14/11/15 22:47:02 INFO executor.Executor: Running task 2.0 in stage 2.0 (TID 5)
14/11/15 22:47:02 INFO rdd.NewHadoopRDD: Input split: hdfs://10.12.0.245/user/hduser/finefoods.txt:268435456+102361028
14/11/15 22:47:02 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 4, localhost): java.lang.ArrayIndexOutOfBoundsException:
14/11/15 22:47:02 ERROR scheduler.TaskSetManager: Task 1 in stage 2.0 failed 1 times; aborting job
14/11/15 22:47:02 INFO scheduler.TaskSchedulerImpl: Cancelling stage 2
14/11/15 22:47:02 INFO scheduler.TaskSchedulerImpl: Stage 2 was cancelled
14/11/15 22:47:02 INFO executor.Executor: Executor is trying to kill task 2.0 in stage 2.0 (TID 5)
14/11/15 22:47:02 INFO executor.Executor: Executor is trying to kill task 0.0 in stage 2.0 (TID 3)
14/11/15 22:47:02 INFO scheduler.DAGScheduler: Failed to run first at <console>:28
14/11/15 22:47:02 INFO executor.Executor: Executor killed task 0.0 in stage 2.0 (TID 3)
14/11/15 22:47:02 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 3, localhost): TaskKilled (killed intentionally)
14/11/15 22:47:02 INFO executor.Executor: Executor killed task 2.0 in stage 2.0 (TID 5)
14/11/15 22:47:02 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 2.0 (TID 5, localhost): TaskKilled (killed intentionally)
14/11/15 22:47:02 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 4, localhost): java.lang.ArrayIndexOutOfBoundsException:
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Aber wenn ich Dummy-Dataset wie die folgenden verwenden, es funktioniert gut:
var tupleSet = sc.parallelize(List(
("B001E4KFG0","A3SGXH7AUHU8GW",3.0),
("B001E4KFG1","A3SGXH7AUHU8GW",4.0),
("B001E4KFG2","A3SGXH7AUHU8GW",4.0),
("B001E4KFG3","A3SGXH7AUHU8GW",4.0),
("B001E4KFG4","A3SGXH7AUHU8GW",5.0),
("B001E4KFG5","A3SGXH7AUHU8GW",5.0),
("B001E4KFG0","bbb",5.0)
))
Jede Idee, w hy?