2016-05-26 12 views
2

Ich habe Probleme herauszufinden, eine sichere Methode zum Anhängen an Dateien in HDFS.Was ist die empfohlene Methode zum Anhängen an HDFS-Dateien?

Ich benutze eine kleine, 3-node Hadoop cluster (CDH v.5.3.9 to be specific). Unser Prozess ist ein Datenpipeliner, der multi-threaded (8 threads)10 ist, und er hat eine Stufe, die Zeilen von Text mit Trennzeichen an Dateien in einem dedizierten Verzeichnis an HDFS anhängt. Ich verwende Sperren, um den Zugriff der Threads auf die gepufferten Writer zu synchronisieren, die die Daten anhängen.

Meine erste Frage ist, über den Ansatz allgemein zu entscheiden.

Ansatz A ist die Datei zu öffnen, an sie anzuhängen und dann für jede angehängte Zeile zu schließen. Das scheint langsam und scheint zu viele kleine Blöcke zu erzeugen, oder zumindest sehe ich eine solche Stimmung in verschiedenen Beiträgen. Approach B Approach B ist es, die Brenner zwischenzuspeichern, aber regelmäßig zu aktualisieren, um sicherzustellen, dass die Liste der Writer nicht unbegrenzt wächst (derzeit ist es ein Writer für jede Eingabedatei, die vom Pipeliner verarbeitet wird). Dies scheint ein effizienterer Ansatz zu sein, aber ich stelle mir vor, offene Streams über einen bestimmten Zeitraum zu haben, aber kontrolliert könnte ein Problem sein, besonders für Ausgabedateileser (?)

Darüber hinaus sind meine wirklichen Probleme zwei. Ich bin mit dem FileSystem Java Hadoop API das Anfügen zu tun und bin zeitweise diese zwei Ausnahmen immer:

org.apache.hadoop.ipc.RemoteException: failed to create file /output/acme_20160524_1.txt for DFSClient_NONMAPREDUCE_271210261_1 for client XXX.XX.XXX.XX because current leaseholder is trying to recreate file.

org.apache.hadoop.ipc.RemoteException: BP-1999982165-XXX.XX.XXX.XX-1463070000410:blk_1073760252_54540 does not exist or is not under Constructionblk_1073760252_545 40{blockUCState=UNDER_RECOVERY, primaryNodeIndex=1, replicas=[ReplicaUnderConstruction[[DISK]DS-ccdf4e55-234b-4e17-955f-daaed1afdd92:NORMAL|RBW], ReplicaUnderConst ruction[[DISK]DS-1f66db61-759f-4c5d-bb3b-f78c260e338f:NORMAL|RBW]]}

Wer auf eine dieser beiden irgendwelche Ideen?

Für das erste Problem habe ich Instrumentierungslogik in this post diskutiert versucht, aber schien nicht zu helfen.

Ich bin auch interessiert an der Rolle der dfs.support.append Eigenschaft, wenn überhaupt anwendbar.

Mein Code für das Erhalten des Dateisystemes:

userGroupInfo = UserGroupInformation.createRemoteUser("hdfs"); Configuration conf = new Configuration(); 
conf.set(key1, val1); 
... 
conf.set(keyN, valN); 
fileSystem = userGroupInfo.doAs(new PrivilegedExceptionAction<FileSystem>() { 
    public FileSystem run() throws Exception { 
    return FileSystem.get(conf); 
    } 
}); 

Mein Code für den Output bekommen:

org.apache.hadoop.fs.path.Path file = ... 
public OutputStream getOutputStream(boolean append) throws IOException { 
    OutputStream os = null; 
    synchronized (file) { 
    if (isFile()) { 
     os = (append) ? fs.append(file) : fs.create(file, true); 
    } else if (append) { 
     // Create the file first, to avoid "failed to append to non-existent file" exception 
     FSDataOutputStream dos = fs.create(file); 
     dos.close(); 
     // or, this can be: fs.createNewFile(file); 
     os = fs.append(file); 
    } 
    // Creating a new file 
    else { 
     os = fs.create(file); 
    } 
    } 
    return os; 
} 
+0

Es scheint, dass https://issues.apache.org/jira/browse/HDFS-7203 hier im Spiel sein kann. Ich habe einen Prototyp-Code, der einen einzelnen Thread verwaltet, der an eine einzelne Datei angehängt wird. Aber ich sehe immer noch die "aktuellen Pächter versucht, Datei wiederherzustellen" Ausnahmen. –

Antwort

2

Ich habe Datei Anfügen mit CDH Arbeits 5.3/HDFS 2.5.0. Meine Schlussfolgerungen sind so weit wie folgt:

  • Kann haben nicht einen dedizierten Thread Appends pro Datei zu tun, oder mehrere Threads, um mehrere Dateien zu schreiben, ob wir Daten über ein und dieselbe Instanz des HDFS API Filesystem zu schreiben, oder verschiedene Instanzen.
  • Die Writer können nicht aktualisiert (d. H. Geschlossen und erneut geöffnet) werden; Sie müssen offen bleiben.
  • Dieses letzte Element führt zu gelegentlichen relativ seltenen ClosedChannelException, die wiederherstellbar scheint (durch erneutes Anfügen).
  • Wir verwenden einen einzelnen Thread-Executor-Dienst mit einer blockierenden Warteschlange (eine zum Anhängen an alle Dateien); ein Schreiber pro Datei, die Schreiber bleiben offen (bis zum Ende der Verarbeitung, wenn sie geschlossen sind).
  • Wenn wir auf CDH neuer als 5 aktualisieren.3, wir wollen dies noch einmal überprüfen und sehen, welche Threading-Strategie sinnvoll ist: ein und nur ein Thread, ein Thread pro Datei, mehrere Threads, die in mehrere Dateien schreiben. Außerdem sollten wir sehen, ob Schreiber regelmäßig geschlossen und wieder geöffnet werden können.
  • Darüber hinaus habe ich auch den folgenden Fehler gesehen, und konnte es durch Setzen von "dfs.client.block.write.replace-datanode-on-failure.policy" zu "NIE" auf der Client-Seite.
java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[XXX.XX.XXX.XX:50010, XXX.XX.XXX.XX:50010], original=[XXX.XX.XXX.XX:50010, XXX.XX.XXX.XX:50010]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration. 
     at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.findNewDatanode(DFSOutputStream.java:969) ~[hadoop-hdfs-2.5.0.jar:?] 
     at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1035) ~[hadoop-hdfs-2.5.0.jar:?] 
     at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1184) ~[hadoop-hdfs-2.5.0.jar:?] 
     at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:532) ~[hadoop-hdfs-2.5.0.jar:?]