2012-11-19 6 views
9

Ich schreibe eine C# (.NET 4.5) Anwendung, die verwendet wird, um zeitbasierte Ereignisse für Berichtszwecke zu aggregieren. Um meine Abfragelogik für Echtzeit- und Verlaufsdaten wiederverwendbar zu machen, nutze ich die Reaktive Erweiterungen (2.0) und ihre IScheduler Infrastruktur (HistoricalScheduler und Freunde).Warum wirft Observable.Generate() System.StackOverflowException?

Zum Beispiel nehmen wir eine Liste von Ereignissen erstellen (chronologisch sortiert, aber sie können zusammenfallen!), Deren einzige Nutzlast ist ihre Zeitstempel und wollen ihre Verteilung über Puffer von befristetem wissen:

const int num = 100000; 
const int dist = 10; 

var events = new List<DateTimeOffset>(); 
var curr = DateTimeOffset.Now; 
var gap = new Random(); 

var time = new HistoricalScheduler(curr); 

for (int i = 0; i < num; i++) 
{ 
    events.Add(curr); 
    curr += TimeSpan.FromMilliseconds(gap.Next(dist)); 
} 

var stream = Observable.Generate<int, DateTimeOffset>(
    0, 
    s => s < events.Count, 
    s => s + 1, 
    s => events[s], 
    s => events[s], 
    time); 

stream.Buffer(TimeSpan.FromMilliseconds(num), time) 
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count)); 

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist)); 

Ausführen dieses Codes führt zu einem System.StackOverflowException mit dem folgenden Stack-Trace (Es sind die letzten drei Linien den ganzen Weg nach unten):

mscorlib.dll!System.Threading.Interlocked.Exchange<System.IDisposable>(ref System.IDisposable location1, System.IDisposable value) + 0x3d bytes  
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x37 bytes  
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes  
... 
System.Reactive.Core.dll!System.Reactive.Disposables.AnonymousDisposable.Dispose() + 0x4d bytes  
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x4f bytes  
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes  
... 

Ok, scheint das Problem aus meiner Verwendung von Observable.Generate() zu kommen, auf der Liste abhängig Größe (num) und unabhängig von der Wahl des Schedulers.

Was mache ich falsch? Oder allgemeiner, was wäre der bevorzugte Weg, um eine IObservable von einer IEnumerable von Ereignissen zu erstellen, die ihre eigenen Zeitstempel bieten?

+1

Wie groß kann 'num' sein, bevor Sie auf diesen Fehler stoßen? Wenn Sie dies im Debugger in einem Schritt ausführen, wird die letzte Codezeile ausgeführt, bevor der Fehler angezeigt wird. –

+0

Für mich scheint der kritische Schwellenwert bei ~ 'num = 51600' zu liegen (in Release config, ein bisschen weniger in Debug config). Die beobachtbare Sequenz scheint vollständig erstellt zu sein. Ich kann Haltepunkte bei den lamdba-Ausdrücken für "Observable.Generate()" treffen. Die Ausnahme wird nach dem letzten Aufruf von 'Console.WriteLine()' ausgelöst. –

+1

Verstehen Sie, dies ist nur eine Vermutung, aber es sieht verdächtig aus, dass der Stream versucht, jedes Element zu entfernen, und jedes Element versucht, den Stream zu entsorgen. Sie enden damit, was im Wesentlichen rekursive Aufrufe zu "Abbrechen" oder "Dispose" sind, die Ihren Stack (Standardgröße von 1 Megabyte) bläst. Ich kenne Observable nicht genug, um zu sagen, warum das passiert. –

Antwort

3

(update - erkannte ich nicht eine Alternative haben: siehe bei unten der Antwort)

Das Problem ist, wie Observable.Generate funktioniert - es wird verwendet, um einen corecursive (denke Rekursion drehte sich nach außen) Generator basierend auf den Argumenten zu entfalten; Wenn diese Argumente einen sehr genested corecursive Generator generieren, werden Sie Ihren Stack sprengen.

Von diesem Punkt an, Ich spekuliere viel (habe nicht die Quelle Rx vor mir) (siehe unten), aber ich bin bereit zu wetten, Ihre Definition endet in etwas wie erweitern :

initial_state => 
generate_next(initial_state) => 
generate_next(generate_next(initial_state)) => 
generate_next(generate_next(generate_next(initial_state))) => 
generate_next(generate_next(generate_next(generate_next(initial_state)))) => ... 

Und weiter und weiter, bis Ihr Aufrufstapel groß genug wird, um zu überlaufen.Zum Beispiel eine Methodensignatur + Ihr int-Zähler, das wäre etwa 8-16 Bytes pro rekursivem Aufruf (mehr abhängig davon, wie der Zustandsautomaten-Generator implementiert ist), also 60.000 klingt richtig (1M/16 ~ 62500 Maximum Tiefe)

EDIT: die Quelle nach oben gezogen - bestätigt: die "Run" Methode der so aussieht wie generieren - nehmen Kenntnis von den verschachtelten Anrufe Generate:

protected override IDisposable Run(
    IObserver<TResult> observer, 
    IDisposable cancel, 
    Action<IDisposable> setSink) 
{ 
    if (this._timeSelectorA != null) 
    { 
     Generate<TState, TResult>.α α = 
       new Generate<TState, TResult>.α(
        (Generate<TState, TResult>) this, 
        observer, 
        cancel); 
     setSink(α); 
     return α.Run(); 
    } 
    if (this._timeSelectorR != null) 
    { 
     Generate<TState, TResult>.δ δ = 
       new Generate<TState, TResult>.δ(
        (Generate<TState, TResult>) this, 
        observer, 
        cancel); 
     setSink(δ); 
     return δ.Run(); 
    } 
    Generate<TState, TResult>._ _ = 
      new Generate<TState, TResult>._(
        (Generate<TState, TResult>) this, 
        observer, 
        cancel); 
    setSink(_); 
    return _.Run(); 
} 

EDIT: Derp, nicht Angebot irgendwelche Alternativen ... hier ist eine, die funktionieren könnte:

(BEARBEITEN: örtlich festgelegt Enumerable.Range, also wird Stromgröße nicht durchmultipliziert)

const int num = 160000; 
const int dist = 10; 

var events = new List<DateTimeOffset>(); 
var curr = DateTimeOffset.Now; 
var gap = new Random(); 
var time = new HistoricalScheduler(curr); 

for (int i = 0; i < num; i++) 
{ 
    events.Add(curr); 
    curr += TimeSpan.FromMilliseconds(gap.Next(dist)); 
} 

    // Size too big? Fine, we'll chunk it up! 
const int chunkSize = 10000; 
var numberOfChunks = events.Count/chunkSize; 

    // Generate a whole mess of streams based on start/end indices 
var streams = 
    from chunkIndex in Enumerable.Range(0, (int)Math.Ceiling((double)events.Count/chunkSize) - 1) 
    let startIdx = chunkIndex * chunkSize 
    let endIdx = Math.Min(events.Count, startIdx + chunkSize) 
    select Observable.Generate<int, DateTimeOffset>(
     startIdx, 
     s => s < endIdx, 
     s => s + 1, 
     s => events[s], 
     s => events[s], 
     time); 

    // E pluribus streamum 
var stream = Observable.Concat(streams); 

stream.Buffer(TimeSpan.FromMilliseconds(num), time) 
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count)); 

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist)); 
+0

Danke, das ist perfekt! Scheint auch effizienter zu sein als meine eigene Problemumgehung. Ich musste jedoch einen kleinen Fehler in Ihrer Arithmetik beheben (siehe Bearbeiten). Ich verstehe immer noch nicht, warum die rekursive Implementierung in RX benötigt wird. Schließlich scheint es mit RX v1.0 zu funktionieren (weit über 60.000). Trotzdem, nette Untersuchung, clevere Lösung. Danke noch einmal! –

+0

Kein Problem! Heh - ich bin wirklich beeindruckt, ich hatte nur einen * mathematischen Fehler ...;) – JerKimball

3

OK, ich habe eine andere Factory-Methode gewählt, die keine lamdba-Ausdrücke als Zustandsübergänge benötigt und jetzt sehe ich keine Stack-Überläufe mehr. Ich bin noch nicht sicher, ob dies als richtige Antwort auf meine Frage qualifizieren würde, aber es funktioniert und ich dachte, Id teilen sich hier:

var stream = Observable.Create<DateTimeOffset>(o => 
    { 
     foreach (var e in events) 
     { 
      time.Schedule(e,() => o.OnNext(e)); 
     } 

     time.Schedule(events[events.Count - 1],() => o.OnCompleted()); 

     return Disposable.Empty; 
    }); 

manuell die Ereignisse Planung vor scheint das Abonnement Rückkehr (!) peinlich für mich, aber in diesem Fall kann es innerhalb der Lambda-Ausdruck getan werden.

Wenn an dieser Vorgehensweise etwas nicht stimmt, korrigieren Sie mich bitte. Außerdem wäre ich immer noch glücklich zu hören, welche impliziten Annahmen von System.Reactive ich mit meinem ursprünglichen Code verletzt habe.

(Oh mein Gott, ich hätte überprüft, dass früher: mit RX v1.0, wird die ursprüngliche Observable.Generate() tatsächlich zu funktionieren scheint!)