2014-12-31 3 views
23

Wir verwenden die Stream-Funktionalität in RavenDB zu laden, zu transformieren und Migration von Daten zwischen zwei Datenbanken wie folgt:RavenDB-Stream für Unbounded Ergebnisse - Anschluss Resilience

var query = originSession.Query<T>(IndexForQuery); 

using (var stream = originSession.Advanced.Stream(query)) 
{ 
    while (stream.MoveNext()) 
    { 
     var streamedDocument = stream.Current.Document; 

     OpenSessionAndMigrateSingleDocument(streamedDocument); 
    } 
} 

Das Problem ist, dass eine der Sammlungen hat Millionen Zeilen, und halten wir ein IOException in folgendem Format empfangen:

Application: MigrateToNewSchema.exe 
Framework Version: v4.0.30319 
Description: The process was terminated due to an unhandled exception. 
Exception Info: System.IO.IOException 
Stack: 
    at System.Net.ConnectStream.Read(Byte[], Int32, Int32) 
    at System.IO.Compression.DeflateStream.Read(Byte[], Int32, Int32) 
    at System.IO.Compression.GZipStream.Read(Byte[], Int32, Int32) 
    at System.IO.StreamReader.ReadBuffer(Char[], Int32, Int32, Boolean ByRef) 
    at System.IO.StreamReader.Read(Char[], Int32, Int32) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadData(Boolean, Int32) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadStringIntoBuffer(Char) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseString(Char) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseValue() 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadInternal() 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.Read() 
    at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader) 
    at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader) 
    at Raven.Json.Linq.RavenJToken.ReadFrom(Raven.Imports.Newtonsoft.Json.JsonReader) 
    at Raven.Client.Connection.ServerClient+<YieldStreamResults>d__6b.MoveNext() 
    at Raven.Client.Document.DocumentSession+<YieldQuery>d__c`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MoveNext() 
    at MigrateToNewSchema.Migrator.DataMigratorBase`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MigrateCollection() 
    at MigrateToNewSchema.Program.MigrateData(MigrateToNewSchema.Enums.CollectionToMigrate, Raven.Client.IDocumentStore, Raven.Client.IDocumentStore) 
    at MigrateToNewSchema.Program.Main(System.String[]) 

Dies geschieht ziemlich langen Weg in Streaming und natürlich vorübergehende Verbindungsprobleme über diese Art von Zeit auftreten werden (es Stunden in Anspruch nimmt).

Wenn wir jedoch versuchen, wie wir eine Query verwenden, müssen wir von vorne anfangen. Also, wenn es schließlich einen Verbindungsfehler während der gesamten Stream gibt, dann müssen wir es erneut versuchen, und wieder, bis es Ende zu Ende funktioniert.

Ich weiß, dass Sie ETag mit Stream verwenden können, um effektiv an einem bestimmten Punkt neu zu starten, jedoch gibt es keine Überladung, dies mit einer Query zu tun, die wir die migrierten Ergebnisse filtern und die richtige Sammlung angeben müssen.

Gibt es also in RavenDB eine Möglichkeit, die interne Ausfallsicherheit der Verbindung (Verbindungszeichenfolgeneigenschaft, interne Einstellungen usw.) zu verbessern oder einen Stream bei einem Fehler effektiv "wiederherzustellen"?

+0

ich entdeckt habe [Data Abonnements] (http://ravendb.net/docs/article-page/3.0/csharp/client-api/data- Subscriptions/how-to-create-data-subscription), eine Funktion von RavenDb 3.0, die einen zuverlässigen Mechanismus für die Iteration über eine Sammlung von Dokumenten mit bestimmten Kriterien bietet und es Ihnen ermöglicht, einfach dort weiterzumachen, wo Sie aufgehört haben. Wenn jemand bereit wäre, einige Codebeispiele zusammenzustellen, die zeigen, wie diese Funktion diese Frage beantworten könnte, würde ich das für das Kopfgeld halten. – StriplingWarrior

+0

Sind Sie an die Verwendung einer Abfrage gebunden? Obwohl es ineffizienter ist, ist dies eine Migration, daher ist Speicher kein Problem - warum nicht die rohen doc-Sammlungen iterieren und In-Memory filtern, so dass Sie an einem Etag fortfahren können? So handle ich mit allem Streaming, ich benutze nie Abfragen. – kamranicus

+0

@StriplingWarrior Es ist eine Weile her :-) Ich arbeite nicht mehr für das Unternehmen mit RavenDB, aber das interessiert mich immer noch, also werde ich eine Antwort mit dem Datenabonnement-Code heute stecken –

Antwort

2

Nach dem Vorschlag von @StriplingWarrior habe ich die Lösung mit Data Subscriptions neu erstellt.

Mit diesem Ansatz konnte ich über alle 2 Millionen Zeilen iterieren (obwohl mit viel weniger Verarbeitung pro Element zugegeben); 2 Punkte hier, die dazu beigetragen haben würden, wenn wir die gleiche Logik Streams zu implementieren versuchen:

  1. Batches nur aus dem Abonnement entfernt bekommen „Warteschlange“ einmal bestätigt (wie die meisten Standard-Queues)
    1. Die abonnierten IObserver<T> muss erfolgreich abgeschlossen werden, damit diese Bestätigung gesetzt wird.
    2. Diese Informationen werden vom Server verarbeitet, anstatt der Client der Client so können im Abonnement verarbeitet neu zu starten, ohne die letzte erfolgreiche Position zu beeinflussen
    3. See here for more details
  2. Wie @StriplingWarrior angezeigt, weil Sie mit erstellen können Abonnements Filter bis hinunter zur Eigenschaftsebene wäre es möglich, mit einer kleineren Ergebnismenge im Falle einer Ausnahme innerhalb der Subskription selbst zu wiederholen.
    1. Der erste Punkt ersetzt dies wirklich; aber es erlaubt uns zusätzliche Flexibilität nicht in der Stream-API gesehen

Die Testumgebung ist eine RavenDB 3.0-Datenbank (lokale Rechner, als Windows-Dienst ausgeführt wird) mit den Standardeinstellung gegen eine Sammlung von 2 Millionen Datensätze.

-Code der Attrappe Datensätze zu erzeugen:

using (IDocumentStore store = GetDocumentStore()) 
{ 
    store.Initialize(); 

    using (var bulkInsert = store.BulkInsert()) 
    { 
     for (var i = 0; i != recordsToCreate; i++) 
     { 
      var person = new Person 
      { 
       Id = Guid.NewGuid(), 
       Firstname = NameGenerator.GenerateFirstName(), 
       Lastname = NameGenerator.GenerateLastName() 
      }; 

      bulkInsert.Store(person); 
     } 
    } 
} 

zu dieser Sammlung Anmeldung ist dann ein Fall von:

using (IDocumentStore store = GetDocumentStore()) 
{ 
    store.Initialize(); 

    var subscriptionId = store.Subscriptions.Create(new SubscriptionCriteria<Person>()); 

    var personSubscription = store.Subscriptions.Open<Person>(
     subscriptionId, new SubscriptionConnectionOptions() 
    { 
     BatchOptions = new SubscriptionBatchOptions() 
     { 
      // Max number of docs that can be sent in a single batch 
      MaxDocCount = 16 * 1024, 
      // Max total batch size in bytes 
      MaxSize = 4 * 1024 * 1024, 
      // Max time the subscription needs to confirm that the batch 
      // has been successfully processed 
      AcknowledgmentTimeout = TimeSpan.FromMinutes(3) 
     }, 
     IgnoreSubscribersErrors = false, 
     ClientAliveNotificationInterval = TimeSpan.FromSeconds(30) 
    }); 

    personSubscription.Subscribe(new PersonObserver()); 

    while (true) 
    { 
     Thread.Sleep(TimeSpan.FromMilliseconds(500)); 
    } 
} 

Note die PersonObserver; dies ist nur eine grundlegende Implementierung von IObserver wie so:

public class PersonObserver : IObserver<Person> 
{ 
    public void OnCompleted() 
    { 
     Console.WriteLine("Completed"); 
    } 

    public void OnError(Exception error) 
    { 
     Console.WriteLine("Error occurred: " + error.ToString()); 
    } 

    public void OnNext(Person person) 
    { 
     Console.WriteLine($"Received '{person.Firstname} {person.Lastname}'"); 
    } 
} 
+1

Nizza write-up. Ich fand es nützlich, eine 'Task' (oder eine' Task' basierend auf einem gegebenen 'CancellationToken') zu übergeben, und' wartet' auf die Aufgabe statt 'while (true)'. Auf diese Weise kann der aufrufende Code den Vorgang gefahrlos abbrechen, ohne den gesamten Thread oder Prozess zu löschen. Ich habe auch einen ETag-basierten Mechanismus entwickelt, um der Migration zu helfen, wenn es fertig ist, alle Zieldokumente zu treffen, damit es sich selbst stoppen kann, aber es ist ziemlich schwierig und nicht für alle Zwecke geeignet. – StriplingWarrior