Ich habe eine Funke-App. Ich speichere eine RDD auf HDFs mit saveAsNewAPIHadoopDataset
, unter Verwendung der AvroKeyOutputFormat
.Spark Job-Fehler wegen ClosedChannelException (DFSOutputStream.checkClosed)
Für große RDDs irgendwann bekomme ich so viele ClosedChannelException
, dass die App schließlich abbricht.
Ich habe irgendwo gelesen, dass die Einstellung hadoopConf.set("fs.hdfs.impl.disable.cache", "false");
hilft.
Hier ist, wie ich meine rdd sparen:
hadoopConf.set("fs.hdfs.impl.disable.cache", "false");
final Job job = Job.getInstance(hadoopConf);
FileOutputFormat.setOutputPath(job, outPutPath);
AvroJob.setOutputKeySchema(job, MyClass.SCHEMA$);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
rdd
.mapToPair(new PreparePairForDatnum())
.saveAsNewAPIHadoopDataset(job.getConfiguration());
Hier ist der Stack-Trace:
java.nio.channels.ClosedChannelException
at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1765)
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:108)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:458)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:121)
at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
at org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
at org.apache.avro.file.DataFileStream$DataBlock.writeBlockTo(DataFileStream.java:369)
at org.apache.avro.file.DataFileWriter.writeBlock(DataFileWriter.java:395)
at org.apache.avro.file.DataFileWriter.writeIfBlockFull(DataFileWriter.java:340)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:311)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1036)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1034)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1034)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1042)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1014)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Suppressed: java.nio.channels.ClosedChannelException
at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1765)
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:108)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:458)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:121)
at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
at org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
at org.apache.avro.file.DataFileStream$DataBlock.writeBlockTo(DataFileStream.java:369)
at org.apache.avro.file.DataFileWriter.writeBlock(DataFileWriter.java:395)
at org.apache.avro.file.DataFileWriter.sync(DataFileWriter.java:413)
at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:422)
at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:445)
at org.apache.avro.mapreduce.AvroKeyRecordWriter.close(AvroKeyRecordWriter.java:83)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1043)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1215)
... 8 more
Ich sehe dies auch passieren, wenn der Container getötet wird, weil es Speichergrenzen überschreitet. Aber danke, dass du mich auf den Scheduler usw. gesetzt hast. Ich habe das Gefühl, dass ich mich als nächstes darum kümmern muss! – hba