Ich habe Probleme herauszufinden, eine sichere Methode zum Anhängen an Dateien in HDFS
.Was ist die empfohlene Methode zum Anhängen an HDFS-Dateien?
Ich benutze eine kleine, 3-node Hadoop cluster (CDH v.5.3.9 to be specific)
. Unser Prozess ist ein Datenpipeliner, der multi-threaded (8 threads)
10 ist, und er hat eine Stufe, die Zeilen von Text mit Trennzeichen an Dateien in einem dedizierten Verzeichnis an HDFS
anhängt. Ich verwende Sperren, um den Zugriff der Threads auf die gepufferten Writer zu synchronisieren, die die Daten anhängen.
Meine erste Frage ist, über den Ansatz allgemein zu entscheiden.
Ansatz A ist die Datei zu öffnen, an sie anzuhängen und dann für jede angehängte Zeile zu schließen. Das scheint langsam und scheint zu viele kleine Blöcke zu erzeugen, oder zumindest sehe ich eine solche Stimmung in verschiedenen Beiträgen. Approach B Approach B ist es, die Brenner zwischenzuspeichern, aber regelmäßig zu aktualisieren, um sicherzustellen, dass die Liste der Writer nicht unbegrenzt wächst (derzeit ist es ein Writer für jede Eingabedatei, die vom Pipeliner verarbeitet wird). Dies scheint ein effizienterer Ansatz zu sein, aber ich stelle mir vor, offene Streams über einen bestimmten Zeitraum zu haben, aber kontrolliert könnte ein Problem sein, besonders für Ausgabedateileser (?)
Darüber hinaus sind meine wirklichen Probleme zwei. Ich bin mit dem FileSystem Java Hadoop API
das Anfügen zu tun und bin zeitweise diese zwei Ausnahmen immer:
org.apache.hadoop.ipc.RemoteException: failed to create file /output/acme_20160524_1.txt for DFSClient_NONMAPREDUCE_271210261_1 for client XXX.XX.XXX.XX because current leaseholder is trying to recreate file.
org.apache.hadoop.ipc.RemoteException: BP-1999982165-XXX.XX.XXX.XX-1463070000410:blk_1073760252_54540 does not exist or is not under Constructionblk_1073760252_545 40{blockUCState=UNDER_RECOVERY, primaryNodeIndex=1, replicas=[ReplicaUnderConstruction[[DISK]DS-ccdf4e55-234b-4e17-955f-daaed1afdd92:NORMAL|RBW], ReplicaUnderConst ruction[[DISK]DS-1f66db61-759f-4c5d-bb3b-f78c260e338f:NORMAL|RBW]]}
Wer auf eine dieser beiden irgendwelche Ideen?
Für das erste Problem habe ich Instrumentierungslogik in this post diskutiert versucht, aber schien nicht zu helfen.
Ich bin auch interessiert an der Rolle der dfs.support.append
Eigenschaft, wenn überhaupt anwendbar.
Mein Code für das Erhalten des Dateisystemes:
userGroupInfo = UserGroupInformation.createRemoteUser("hdfs"); Configuration conf = new Configuration();
conf.set(key1, val1);
...
conf.set(keyN, valN);
fileSystem = userGroupInfo.doAs(new PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws Exception {
return FileSystem.get(conf);
}
});
Mein Code für den Output bekommen:
org.apache.hadoop.fs.path.Path file = ...
public OutputStream getOutputStream(boolean append) throws IOException {
OutputStream os = null;
synchronized (file) {
if (isFile()) {
os = (append) ? fs.append(file) : fs.create(file, true);
} else if (append) {
// Create the file first, to avoid "failed to append to non-existent file" exception
FSDataOutputStream dos = fs.create(file);
dos.close();
// or, this can be: fs.createNewFile(file);
os = fs.append(file);
}
// Creating a new file
else {
os = fs.create(file);
}
}
return os;
}
Es scheint, dass https://issues.apache.org/jira/browse/HDFS-7203 hier im Spiel sein kann. Ich habe einen Prototyp-Code, der einen einzelnen Thread verwaltet, der an eine einzelne Datei angehängt wird. Aber ich sehe immer noch die "aktuellen Pächter versucht, Datei wiederherzustellen" Ausnahmen. –