2016-06-16 10 views
0

Ich lese einen Stream, in dem ich in einem bestimmten Zeitintervall nach Json-Werten suche. Alles gelesen, vor und während der Werte im Zeitintervall möchte ich cachen. Sobald der Stream alle Werte im Zeitintervall gelesen hat (chronologische Reihenfolge vorausgesetzt), sollte ich aufhören zu lesen.Scala-Stream lesen, bis die Kriterien erfüllt sind

Dieser Code speichert alle Werte in einer veränderbaren Liste und filtert dann nur nach json im Datumsbereich. Aber es hört nicht auf zu lesen, sobald der Filter abgeschlossen ist, es speichert alles im Stream zwischen.

val cache: MutableList[JsValue] = MutableList.empty 

pg.pins.flatten 
    .map(record => {cache += record; record}) 
    .filter(js => (js \ "created_at").as[Date].after(start) && (js \ "created_at").as[Date].before(end)) 

Jede Hilfe wird geschätzt.

Antwort

1

Sie brauchen keine veränderbare Liste dafür (vermeiden Sie die Verwendung von veränderbaren Strukturen, es sei denn, es gibt einen wirklichen Grund, warum Sie müssen). Unter der Annahme der pg.pins ist ein Stream und dass die Daten nach dem Datum sortiert wird (das Ihre Frage impliziert zu sein scheint), so etwas wie dies tun sollten, was Sie wollen:

val (after, before) = pg.pins.flatten 
    .takeWhile { js => (js \ "createdAt").as[Date].before(end) } 
    .partition { js => (js \ "createdAt").as[Date].after(start) } 
+0

Das ist großartig, es ist genau die Art von Vereinfachung I war auf der Suche nach. Ich weiß, dass ich kein veränderbares, wenn möglich, verwenden sollte, aber ich weiß nicht, wie ich es sonst tun soll. Was Sie gezeigt haben, umfasst Daten mit dem Datumsbereich, aber ich muss weiterhin alle Daten zwischenspeichern, bevor ich den Filterpunkt (separat) erreiche. Irgendwelche Vorschläge? – jay

+0

Oh, ich wusste nicht, dass du es behalten willst. Siehe aktualisierte Antwort. – Dima

+0

Perfect danke – jay