2015-10-28 15 views
14

Ich schreibe ein Modul, das ein beschreibbarer Stream ist. Ich möchte eine Pipe-Schnittstelle für meine Benutzer implementieren.Was ist eine korrekte Möglichkeit, pipe-lesbare Datenströme von schreibbaren in nodejs anzuhalten?

Wenn ein Fehler auftritt, muss ich den lesbaren Stream pausieren und ein Fehlerereignis ausgeben. Dann wird der Benutzer entscheiden - wenn er mit einem Fehler in Ordnung ist, sollte er in der Lage sein, mit der Datenverarbeitung fortzufahren.

var writeable = new BackPressureStream(); 
writeable.on('error', function(error){ 
    console.log(error); 
    writeable.resume(); 
}); 

var readable = require('fs').createReadStream('somefile.txt'); 
readable.pipe.(writeable); 

Ich sehe, dass der Knoten liefert uns readable.pause() Verfahren, das verwendet werden kann lesbaren Stream zu unterbrechen. Aber ich kann nicht, wie ich es von meinem beschreibbar Stream-Modul aufrufen können:

var Writable = require('stream').Writable; 

function BackPressureStream(options) { 
    Writable.call(this, options); 
} 
require('util').inherits(BackPressureStream, Writable); 

BackPressureStream.prototype._write = function(chunk, encoding, done) { 
    done(); 
}; 

BackPressureStream.prototype.resume = function() { 
    this.emit('drain'); 
} 

Wie Gegendruck kann in einem beschreibbaren Strom umgesetzt werden?

P.S. Es ist möglich, pipe/unpipe Ereignisse zu verwenden, die einen lesbaren Stream als Parameter bereitstellen. Es wird jedoch auch gesagt, dass für Pipe-Streams die einzige Möglichkeit zum Pausieren darin besteht, lesbare Datenströme von schreibbaren zu entfernen.

Habe ich es richtig gemacht? Ich muss meinen schreibbaren Stream entfernen, bis Benutzeraufrufe fortgesetzt werden? Und nachdem Benutzeranrufe fortgesetzt werden, sollte ich den lesbaren Strom zurückleiten?

+1

interessiert eine Prämie für diese in zu beginnen? –

+1

Hey, hast du eine Antwort auf deine Frage gefunden? –

Antwort

0

Grundsätzlich, wie ich es verstehe, möchten Sie im Falle eines Fehlerereignisses einen Rückdruck auf den Stream setzen. Sie haben ein paar Optionen.

Erstens, wie Sie bereits identifiziert haben, verwenden pipe eine Instanz des Lesestroms zu greifen und einige Beinarbeit zu tun.

Eine weitere Option ist es, eine Verpackung beschreibbaren Strom zu erzeugen, die diese Funktionalität bereitstellt (dh ein WritableStream als Eingabe verwendet, und wenn Strom Funktionen Implementierung übergibt die Daten zusammen mit dem mitgelieferten Strom.

Grundsätzlich Sie am Ende mit so etwas wie

source stream -> wrapping writable -> writable

https://nodejs.org/api/stream.html#stream_implementing_a_writable_stream beschäftigt sich mit einem beschreibbaren Strom umzusetzen.

Der Schlüssel fo Wenn Sie einen Fehler im zugrunde liegenden Schreibmodus bemerken, setzen Sie ein Flag für den Stream, und beim nächsten Aufruf von write würden Sie den Chunk puffern, den Callback speichern und nur aufrufen. So etwas wie

// ... 
constructor(wrappedWritableStream) { 
    wrappedWritableStream.on('error', this.errorHandler); 
    this.wrappedWritableStream = wrappedWritableStream; 
} 
// ... 
write(chunk, encoding, callback) { 
    if (this.hadError) { 
     // Note: until callback is called, this function won't be called again, so we will have maximum one stored 
     // chunk. 
     this.bufferedChunk = [chunk, encoding, callback]; 
    } else { 
     wrappedWritableStream.write(chunk, encoding, callback); 
    } 
} 
// ... 
errorHandler(err) { 
    console.error(err); 
    this.hadError = err; 
    this.emit(err); 
} 
// ... 
recoverFromError() { 
    if (this.bufferedChunk) { 
     wrappedWritableStream.write(...this.bufferedChunk); 
     this.bufferedChunk = undefined; 
    } 
    this.hadError = false; 
} 

Hinweis: Sie sollten nur die write Funktion implementieren müssen, aber ich Sie ermutigen, um zu graben und mit den anderen Durchführungsfunktionen zu spielen.

Es ist auch erwähnenswert, dass Sie möglicherweise Probleme haben, in Streams zu schreiben, die ein Fehlerereignis ausgegeben haben, aber ich werde das Ihnen als ein separates Problem überlassen, um zu lösen.

Hier ist eine andere gute Ressource auf backpressuring https://nodejs.org/en/docs/guides/backpressuring-in-streams/