2016-06-09 6 views
0

Ich habe eine Funke-App. Ich speichere eine RDD auf HDFs mit saveAsNewAPIHadoopDataset, unter Verwendung der AvroKeyOutputFormat.Spark Job-Fehler wegen ClosedChannelException (DFSOutputStream.checkClosed)

Für große RDDs irgendwann bekomme ich so viele ClosedChannelException, dass die App schließlich abbricht.

Ich habe irgendwo gelesen, dass die Einstellung hadoopConf.set("fs.hdfs.impl.disable.cache", "false"); hilft.

Hier ist, wie ich meine rdd sparen:

 hadoopConf.set("fs.hdfs.impl.disable.cache", "false"); 
     final Job job = Job.getInstance(hadoopConf);    
     FileOutputFormat.setOutputPath(job, outPutPath); 
     AvroJob.setOutputKeySchema(job, MyClass.SCHEMA$); 
     job.setOutputFormatClass(AvroKeyOutputFormat.class); 

     rdd      
       .mapToPair(new PreparePairForDatnum()) 
       .saveAsNewAPIHadoopDataset(job.getConfiguration()); 

Hier ist der Stack-Trace:

java.nio.channels.ClosedChannelException 
    at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1765) 
    at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:108) 
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58) 
    at java.io.DataOutputStream.write(DataOutputStream.java:107) 
    at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:458) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:121) 
    at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216) 
    at org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150) 
    at org.apache.avro.file.DataFileStream$DataBlock.writeBlockTo(DataFileStream.java:369) 
    at org.apache.avro.file.DataFileWriter.writeBlock(DataFileWriter.java:395) 
    at org.apache.avro.file.DataFileWriter.writeIfBlockFull(DataFileWriter.java:340) 
    at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:311) 
    at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77) 
    at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1036) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1034) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1034) 
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1042) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1014) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
    Suppressed: java.nio.channels.ClosedChannelException 
     at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1765) 
     at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:108) 
     at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58) 
     at java.io.DataOutputStream.write(DataOutputStream.java:107) 
     at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:458) 
     at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
     at java.io.BufferedOutputStream.write(BufferedOutputStream.java:121) 
     at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216) 
     at org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150) 
     at org.apache.avro.file.DataFileStream$DataBlock.writeBlockTo(DataFileStream.java:369) 
     at org.apache.avro.file.DataFileWriter.writeBlock(DataFileWriter.java:395) 
     at org.apache.avro.file.DataFileWriter.sync(DataFileWriter.java:413) 
     at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:422) 
     at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:445) 
     at org.apache.avro.mapreduce.AvroKeyRecordWriter.close(AvroKeyRecordWriter.java:83) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1043) 
     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1215) 
     ... 8 more 

Antwort

2

Es kann passieren, wenn Testamentsvollstrecker getötet wurde. Schauen Sie in Ihre Protokolle etwas wie folgt aus:

2016-07-20 22:00:42,976 | WARN | org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint | Container container_e10838_1468831508103_1724_01_055482 on host: hostName was preempted. 
2016-07-20 22:00:42,977 | ERROR | org.apache.spark.scheduler.cluster.YarnClusterScheduler | Lost executor 6 on hostName: Container container_e10838_1468831508103_1724_01_055482 on host: hostName was preempted. 

Wenn Sie dann der Testamentsvollstrecker Ihrer Aufgabe gefunden wurde vom Garn Anwendung Master vorbelegt. Mit anderen Worten, er wurde getötet und erhielt eine weitere Warteschlange. Über Vorkaufsrecht und Garnplanung kann here und here gefunden werden.

+0

Ich sehe dies auch passieren, wenn der Container getötet wird, weil es Speichergrenzen überschreitet. Aber danke, dass du mich auf den Scheduler usw. gesetzt hast. Ich habe das Gefühl, dass ich mich als nächstes darum kümmern muss! – hba