2016-08-04 21 views
1

zip habe ich eine Funktion, die einzelne asynchrone Einfügung in db darstellen:Wie mehrere Observablen (1kk) ohne Vorbelegung sie in Array

function insertInDatabaseRx(data) { 
    return Rx.Observable.create(function (observer) { 
      db.insert(data, function (err) { 
       if (err) { 
        observer.onError(err); 
       } else { 
        observer.onCompleted(); 
       } 
      }); 
    }); 
} 

I Massen Insertion (1kk + mal) und einige Maßnahmen tun müssen nach allem fertig. Meine aktuelle Implementierung:

var someLongArray = ...; 

var massInsertion = []; 
for (var i = 0; i < someLongArray.length; i++) { 
    massInsertion.push(insertInDatabaseRx(someLongArray[i].data); 
} 

Rx.Observable.zip(massInsertion) 
    .subscribe(
     function (x) { 
      logger.debug('Next: %s', x); 
     }, 
     function (e) { 
      logger.error('Error: %s', e); 
     }, 
     function() { 
      //all done 
      logger.info('all save done, terminating ... '); 
      process.exit(); 
     }); 

Es gearbeitet, aber ich denke, es Ressource nicht (cpu/mem) effiziente Anordnung von Observablen vorzubelegen:

var massInsertion = []; 
for (var i = 0; i < someLongArray.length; i++) { 
    massInsertion.push(insertInDatabaseRx(someLongArray[i].data); 
} 

Wie Erfüllung dieser Aufgabe effizienter zu gestalten, ohne Vorbelegung Array von Observablen?

+0

erste, was ich bemerken ist, dass 'insertInDatabaseRx' nicht beobachtbar Vertrag nicht gehorcht. Sie sollten 'onCompleted' nicht ausgeben, nachdem Sie' onError' aufgerufen haben. Ein "else" ist alles was Sie brauchen, um dieses Bit zu reparieren. –

+0

@DaveMoten danke! – zella

+1

Woher kommen 'Daten'? Kommt es von einem Array? Sind Sie sicher, dass Ihre Datenbank keine Masseneinfügung durchführt? – paulpdaniels

Antwort

0

Sie könnten einen forkJoin verwenden()

function insertAll() { 
    var rxResult = insertInDatabaseRx(someLongArray[0].data); 

    for (var i = 1; i < someLongArray.length; i++) { 
     rxResult = rxResult.forkJoin(
      insertInDatabaseRx(someLongArray[i].data), 
      function (result1, result2) { 
       return result1 + result2; 
      }); 
    } 

    rxResult.map(...); 
}