2016-04-09 5 views
0

Ich erbte eine riesige JSON-Datei, die ich in elasticsearch indexieren möchte (nicht wirklich eine Datenbank, aber nicht hängen auf es sollte für die meisten db gelten aufnehmen). Ich benutze Knoten, um die Aufnahme zu machen. Ich habe Streams und Async versucht, aber ich bin ratlos Ich habe keinen Rahmen für die Annäherung an dieses Problem - ohne Speicherüberläufe und dergleichen.Index/Aufnahme sehr große JSON-Datei in die Datenbank mit node.js

Ich kann keine 1 bis 1 schreiben, aber es ist effektiv ein multidimensionales Objekt, das etwas aussieht: Ich habe die Dokumentation zu sich nehmen müssen

[ 
    { 
    document: { 
     type: 1, 
     type2: 2, 
     type3: {...} 
    }, 
    {...} 
] 

, kann ich den Elasticsearch-Client verwenden und sie in großen Mengen verarbeiten . Ich muss den Stream verlangsamen, parsen und klumpen.

Total fest ... Hilfe stackoverflow es ist Freitag ich will nach Hause gehen;).

+1

Haben Sie versucht, https://www.npmjs.com/package/json-parse-stream? – migg

+0

das kann die eine Version von JSON-Stream-Parsing Ich habe es nicht versucht, ich werde dieses jetzt versuchen. – unsalted

+0

ok einige Fortschritte mit diesem dieses Modul kann die Lösung sein. – unsalted

Antwort

1

Basierend auf migg's Vorschlag von json-parse-stream - die dritte JSON-Stream-Bibliothek, die ich ausprobierte - habe ich endlich eine Arbeitsaufnahme. Tatsache ist, dass es läuft, während ich das schreibe. Hoffentlich wird jemand das nützlich finden.

const fs = require('graceful-fs'); 
const parse = require('json-parse-stream'); 
const es = require('event-stream'); 
const client = new require('elasticsearch').Client(); 
var WritableBulk = require('elasticsearch-streams').WritableBulk; 
var TransformToBulk = require('elasticsearch-streams').TransformToBulk; 


var rs = fs.createReadStream('./resources/mydoc.json'); 

var bulkExec = function (body, callback) { 
    console.log(body); 
    client.bulk({ 
    index: 'my_index', 
    type: 'my_type', 
    body: body 
    }, callback); 
}; 

var toBulk = new TransformToBulk(() => { return { _index: 'my_index', _type: 'my_type' }; }); 


const done = (err, res) => { 
    if (err) { 
    console.log(err); 
    } 
    console.log(res); 
    console.log('go get a drink you deserve it'); 
}; 

var ws = new WritableBulk(bulkExec); 

rs.pipe(parse()) 
.pipe(es.mapSync(function (element) { 
    var a = []; 
    if (element.key === 'document') { 
    a = element.value; 
    return a; 
    } 
})) 
.pipe(toBulk) 
.pipe(ws).on('finish', done); 
+0

Schön, dass du deine Antwort hier reingibst. – migg