2015-08-04 22 views
8

Ich habe versucht, einen lesbaren und einen transformierten Stream zu verwenden, um eine sehr große Datei zu verarbeiten. Das Problem, auf das ich zu stoßen scheint, ist, dass das Programm beendet wird, bevor das Ergebnis zurückgegeben wird, wenn ich am Ende keinen beschreibbaren Stream anlege.Node.js Streams zum Umwandeln lesbar

Beispiel: rstream.pipe(split()).pipe(tstream)

My tstream hat einen Emitter, der emittiert, wenn ein Zähler einen Schwellenwert trifft. Wenn dieser Schwellenwert auf eine niedrige Zahl eingestellt ist, erhalte ich ein Ergebnis, aber wenn es hoch ist, gibt es nichts zurück. Wenn ich es an einen Dateischreiber übertrage, gibt es immer ein Ergebnis zurück. Fehle ich etwas Offensichtliches?

Code:

// Dependencies 
var fs = require('fs'); 
var rstream = fs.createReadStream('file'); 
var wstream = fs.createWriteStream('output'); 
var split = require('split'); // used for separating stream by new line 
var QTransformStream = require('./transform'); 

var qtransformstream = new QTransformStream(); 
qtransformstream.on('completed', function(result) { 
    console.log('Result: ' + result); 
}); 
exports.getQ = function getQ(filename, callback) { 

    // THIS WORKS if i have a low counter for qtransformstream, 
    // but when it's high, I do not get a result 
    // rstream.pipe(split()).pipe(qtransformstream); 

    // this always works 
    rstream.pipe(split()).pipe(qtransformstream).pipe(wstream); 

}; 

Hier ist der Code für die Qtransformstream

// Dependencies 
var Transform = require('stream').Transform, 
    util = require('util'); 
// Constructor, takes in the Quser as an input 
var TransformStream = function(Quser) { 
    // Create this as a Transform Stream 
    Transform.call(this, { 
     objectMode: true 
    }); 
    // Default the Qbase to 32 as an assumption 
    this.Qbase = 32; 
    if (Quser) { 
     this.Quser = Quser; 
    } else { 
     this.Quser = 20; 
    } 
    this.Qpass = this.Quser + this.Qbase; 
    this.Counter = 0; 
    // Variables used as intermediates 
    this.Qmin = 120; 
    this.Qmax = 0; 
}; 
// Extend the transform object 
util.inherits(TransformStream, Transform); 
// The Transformation to get the Qbase and Qpass 
TransformStream.prototype._transform = function(chunk, encoding, callback) { 
    var Qmin = this.Qmin; 
    var Qmax = this.Qmax; 
    var Qbase = this.Qbase; 
    var Quser = this.Quser; 
    this.Counter++; 
    // Stop the stream after 100 reads and emit the data 
    if (this.Counter === 100) { 
     this.emit('completed', this.Qbase, this.Quser); 
    } 
    // do some calcs on this.Qbase 

    this.push('something not important'); 
    callback(); 
}; 
// export the object 
module.exports = TransformStream; 
+0

Können Sie den Code für die 'QTransformStream' Implementierung bereitstellen? – mscdex

+0

Wie viele Zeilen haben Sie in der Eingabedatei und was ist der maximale Zählerwert in diesem Fall. Wenn der Zählerwert größer als die Zeilennummern ist, wird das Ereignis 'completed' nicht ausgegeben. Sie müssen auch 'null' drücken, um den Stream zu beenden. Nicht sicher, was du in "etwas nicht wichtig" hast, aber irgendwann sollte ein "Null" sein. – hassansin

+0

Es gibt weniger Linien als der Zähler, etwa 7000 Zeilen. Es funktioniert, wenn ich dies zu einem Schreibstrom pipe. Muss ein Transformations-Stream einen Push (Null) haben, damit er funktioniert? – ace040686

Antwort

6

EDIT:

Auch ich weiß nicht, wie hoch Ihr Zähler geht, aber wenn Sie Füllen Sie den Puffer, es wird aufhören, Daten an den Transform-Stream zu übergeben, in welchem ​​Fall completed nie wirklich getroffen wird, weil Sie neve r an die Gegengrenze kommen. Versuchen Sie, Ihre highwatermark zu ändern.

EDIT 2: ein wenig besser Erklärung

Wie Sie wissen ein transform streamein Duplex-Strom ist die im Grunde bedeutet, dass es Daten von einer Quelle akzeptieren, und es kann Daten an ein Ziel senden. Dies wird üblicherweise als Lesen bzw. Schreiben bezeichnet. Die transform stream erbt sowohl von read stream als auch von write stream, implementiert von Node.js. Es gibt jedoch eine Einschränkung, die transform streammuss nicht die Funktionen _read oder _write implementieren. In diesem Sinne können Sie es als das weniger bekannte passthrough stream betrachten.

Wenn Sie daran denken, dass die transform stream die write stream implementiert, müssen Sie auch darüber nachdenken, dass der Schreibstrom immer ein Ziel hat, um seinen Inhalt zu entladen. Das Problem, das Sie haben ist, dass, wenn Sie ein transform stream erstellen, Sie einen Ort nicht angeben können, um Ihre Inhalte zu senden. Die einzige Möglichkeit, Daten vollständig durch den Transformationsdatenstrom zu leiten, besteht darin, sie in einen Schreibstream zu pipettieren. Andernfalls werden Ihre Datenströme gesichert und können keine Daten mehr annehmen, da für die Daten kein Platz ist.

Aus diesem Grund funktioniert es immer, wenn Sie einen Schreibstream bereitstellen. Der Schreibstrom mindert die Datensicherung, indem die Daten an ein Ziel gesendet werden, sodass alle Ihre Daten weitergeleitet werden und das Ereignis "Vollständig" ausgegeben wird. Der Grund dafür, dass Ihr Code ohne den Schreibstream arbeitet, wenn die Stichprobengröße gering ist, besteht darin, dass Sie Ihren Stream nicht auffüllen, sodass der Transformationsdatenstrom genügend Daten annehmen kann, damit der vollständige Event/Schwellenwert erreicht werden kann . Wenn der Schwellenwert erhöht wird, bleibt die Datenmenge, die Ihr Stream annehmen kann, ohne sie an einen anderen Ort zu senden (ein Schreibstream), gleich. Dadurch wird Ihr Stream gesichert, und er kann keine Daten mehr annehmen, sodass das abgeschlossene Ereignis nie ausgegeben wird.

Ich würde sagen, dass, wenn Sie Ihre highwatermark für den Transform-Stream erhöhen Sie in der Lage, Ihre Schwelle zu erhöhen und immer noch den Code arbeiten. Diese Methode ist jedoch falsch. Rohr Stream auf einen Schreibstrom, der die Daten an dev/null den Weg schicken, dass die Write-Stream-creat ist:

var writer = fs.createWriteStream('/dev/null'); 

Der Abschnitt in dem Node.js docs auf buffering die Fehler erklären Sie in ausgeführt werden.

+0

Die Ströme in Knoten sind nicht so einfach wie sie aussehen. Ich würde gerne eine gute detaillierte Erklärung für diese Feinheiten sehen. – thorn

+0

Ich habe versucht, eine bessere Erklärung zu geben, lassen Sie mich wissen, wenn es einen Teil davon gibt, die nicht klar sind. – RadleyMith

1

Sie unterbrechen nicht _transform und Prozess geht weit weit weg. Versuchen:

this.emit('completed', ...); 
this.end(); 

Deshalb 'Programm zu beenden scheint, bevor das Ergebnis zurückgegeben wird'

Und nicht ausgeben nutzlose Daten:

var wstream = fs.createWriteStream('/dev/null'); 

Viel Glück)

1

Ich würde vorschlagen, eine Writable anstelle eines Transform-Stream zu verwenden. Benennen Sie dann _transform in _write um, und Ihr Code wird den Stream konsumieren, wenn Sie ihn pipen. Ein Transformations-Stream, wie @Bradgnar bereits darauf hingewiesen hat, benötigt einen Consumer oder es wird stop the readable streamen, mehr Daten in seinen Puffer zu pushen.