Ich habe versucht, einen lesbaren und einen transformierten Stream zu verwenden, um eine sehr große Datei zu verarbeiten. Das Problem, auf das ich zu stoßen scheint, ist, dass das Programm beendet wird, bevor das Ergebnis zurückgegeben wird, wenn ich am Ende keinen beschreibbaren Stream anlege.Node.js Streams zum Umwandeln lesbar
Beispiel: rstream.pipe(split()).pipe(tstream)
My tstream
hat einen Emitter, der emittiert, wenn ein Zähler einen Schwellenwert trifft. Wenn dieser Schwellenwert auf eine niedrige Zahl eingestellt ist, erhalte ich ein Ergebnis, aber wenn es hoch ist, gibt es nichts zurück. Wenn ich es an einen Dateischreiber übertrage, gibt es immer ein Ergebnis zurück. Fehle ich etwas Offensichtliches?
Code:
// Dependencies
var fs = require('fs');
var rstream = fs.createReadStream('file');
var wstream = fs.createWriteStream('output');
var split = require('split'); // used for separating stream by new line
var QTransformStream = require('./transform');
var qtransformstream = new QTransformStream();
qtransformstream.on('completed', function(result) {
console.log('Result: ' + result);
});
exports.getQ = function getQ(filename, callback) {
// THIS WORKS if i have a low counter for qtransformstream,
// but when it's high, I do not get a result
// rstream.pipe(split()).pipe(qtransformstream);
// this always works
rstream.pipe(split()).pipe(qtransformstream).pipe(wstream);
};
Hier ist der Code für die Qtransformstream
// Dependencies
var Transform = require('stream').Transform,
util = require('util');
// Constructor, takes in the Quser as an input
var TransformStream = function(Quser) {
// Create this as a Transform Stream
Transform.call(this, {
objectMode: true
});
// Default the Qbase to 32 as an assumption
this.Qbase = 32;
if (Quser) {
this.Quser = Quser;
} else {
this.Quser = 20;
}
this.Qpass = this.Quser + this.Qbase;
this.Counter = 0;
// Variables used as intermediates
this.Qmin = 120;
this.Qmax = 0;
};
// Extend the transform object
util.inherits(TransformStream, Transform);
// The Transformation to get the Qbase and Qpass
TransformStream.prototype._transform = function(chunk, encoding, callback) {
var Qmin = this.Qmin;
var Qmax = this.Qmax;
var Qbase = this.Qbase;
var Quser = this.Quser;
this.Counter++;
// Stop the stream after 100 reads and emit the data
if (this.Counter === 100) {
this.emit('completed', this.Qbase, this.Quser);
}
// do some calcs on this.Qbase
this.push('something not important');
callback();
};
// export the object
module.exports = TransformStream;
Können Sie den Code für die 'QTransformStream' Implementierung bereitstellen? – mscdex
Wie viele Zeilen haben Sie in der Eingabedatei und was ist der maximale Zählerwert in diesem Fall. Wenn der Zählerwert größer als die Zeilennummern ist, wird das Ereignis 'completed' nicht ausgegeben. Sie müssen auch 'null' drücken, um den Stream zu beenden. Nicht sicher, was du in "etwas nicht wichtig" hast, aber irgendwann sollte ein "Null" sein. – hassansin
Es gibt weniger Linien als der Zähler, etwa 7000 Zeilen. Es funktioniert, wenn ich dies zu einem Schreibstrom pipe. Muss ein Transformations-Stream einen Push (Null) haben, damit er funktioniert? – ace040686