2016-08-03 27 views
3

Ich arbeite mit Spark, um Daten in S3 mit dem S3A URI zu schreiben.
Ich benutze auch den Endpunkt s3-external-1.amazonaws.com, um das Problem der späteren Konsistenz bei us-east1 zu vermeiden.Spark 1.6.1 S3 MultiObjectDeleteException

Das folgende Problem tritt auf, wenn einige Daten S3 zu schreiben versuchen (es ist eigentlich eine Bewegungsoperation):

com.amazonaws.services.s3.model.MultiObjectDeleteException: Status Code: 0, AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS Error Message: One or more objects could not be deleted, S3 Extended Request ID: null 
    at com.amazonaws.services.s3.AmazonS3Client.deleteObjects(AmazonS3Client.java:1745) 
    at org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:687) 
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.cleanupJob(FileOutputCommitter.java:381) 
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:314) 
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:151) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) 
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) 
    at org.apache.spark.sql.DataFrameWriter.orc(DataFrameWriter.scala:346) 
    at com.mgmg.memengine.stream.app.persistentEventStreamBootstrap$$anonfun$setupSsc$3.apply(persistentEventStreamBootstrap.scala:122) 
    at com.mgmg.memengine.stream.app.persistentEventStreamBootstrap$$anonfun$setupSsc$3.apply(persistentEventStreamBootstrap.scala:112) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
    at scala.util.Try$.apply(Try.scala:161) 

Objekt (e):

+0

Ich bekomme das gleiche Problem mit einem Non-Spark-Streaming-Job. Spark 1.6.2, Hadoop 2.6. Funktioniert auch nicht mit dem direkten Ausgabe-Committer. – Luke

Antwort

2

Dies kann auch durch Racebedingungen verursacht werden, in denen> 1 Prozess versucht, den Pfad zu löschen; HADOOP-14101 zeigt das.

In diesem speziellen Fall sollten Sie in der Lage sein, den Stack-Trace verschwinden zu lassen, indem Sie die Hadoop-Option fs.s3a.multiobjectdelete.enable auf false setzen.

Update 2017-02-23

Nachdem dies einige Tests geschrieben, ich habe es nicht geschafft, zum Löschen von nicht existierenden Pfade zu replizieren, sondern haben es für Berechtigungsprobleme getan. Angenommen, das ist der Grund für jetzt, obwohl wir mehr Stack-Traces begrüßen würden, um das Problem zu identifizieren. HADOOP-11572 behandelt das Problem, einschließlich Patches für die Dokumentation und für eine bessere Protokollierung des Problems (d. H. Protokollierung der fehlgeschlagenen Pfade und der spezifischen Fehler).

4

ich in dieses Problem lief, wenn ich ein Upgrade Spark 2.0.0, und es stellte sich heraus, dass es eine fehlende S3-Erlaubnis war. Ich führe gerade Spark 2.0.0 mit aws-java-sdk-1.7.4 und hadoop-aws-2.7.2 als Abhängigkeiten.

Um das Problem zu beheben, musste ich die s3:Delete* Aktion in die entsprechende IAM-Richtlinie hinzufügen. Je nachdem, wie Ihre Umgebung eingerichtet ist, kann dies eine Richtlinie für den S3-Bucket, eine Richtlinie für den Benutzer, dessen SECRET_KEY Ihre Hadoop s3a-Bibliothek verbindet, oder eine IAM-Rollenrichtlinie für die EC2-Instanz, in der Spark ausgeführt wird, sein.

In meinem Fall meine Arbeit IAM Rolle Politik sieht nun wie folgt aus:

{ 
    "Version": "2012-10-17", 
    "Statement": [ 
     { 
      "Effect": "Allow", 
      "Action": [ 
       "s3:Delete*", "s3:Get*", "s3:List*", "s3:PutObject" 
      ], 
      "Resource": "arn:aws:s3:::mybucketname/*" 
     } 
    ] 
} 

Dies entweder durch die S3 oder IAM AWS-Konsolen eine schnelle Änderung und sollte sofort anwenden, ohne einen Funken Cluster Neustart zu erfordern. Wenn Sie nicht sicher sind, wie Sie Richtlinien bearbeiten, habe ich ein wenig mehr Details zur Verfügung gestellt here.