2015-03-24 8 views
62

Ich habe den folgenden Beispielcode:Warum filter() nach flatMap() ist "nicht vollständig" in Java-Streams faul?

System.out.println(
     "Result: " + 
     Stream.of(1, 2, 3) 
       .filter(i -> { 
        System.out.println(i); 
        return true; 
       }) 
       .findFirst() 
       .get() 
); 
System.out.println("-----------"); 
System.out.println(
     "Result: " + 
     Stream.of(1, 2, 3) 
       .flatMap(i -> Stream.of(i - 1, i, i + 1)) 
       .flatMap(i -> Stream.of(i - 1, i, i + 1)) 
       .filter(i -> { 
        System.out.println(i); 
        return true; 
       }) 
       .findFirst() 
       .get() 
); 

Der Ausgang ist wie folgt:

1 
Result: 1 
----------- 
-1 
0 
1 
0 
1 
2 
1 
2 
3 
Result: -1 

Von hier aus sehe ich, dass stream wirklich in ersten Fall verhält sich träge - wir findFirst() verwenden, so, wenn wir zuerst haben Element Unser Filter-Lambda wird nicht aufgerufen. Im zweiten Fall, der flatMap verwendet, sehen wir jedoch, dass trotz des ersten Elements, das die Filterbedingung erfüllt (es ist nur irgendein erstes Element, da Lambda immer wahr zurückkehrt), weitere Inhalte des Stroms noch durch Filterfunktion zugeführt werden.

Ich versuche zu verstehen, warum es so verhält, anstatt aufzugeben, nachdem das erste Element wie im ersten Fall berechnet wurde. Jede hilfreiche Information würde geschätzt werden.

+0

Ich verstehe wirklich nicht Ihr Problem. Warum sollte es sich anders verhalten? –

+11

@PhilippSander: Denn wenn es sich träge verhält - wie im ersten Fall - würde es den Filter nur einmal auswerten. –

+0

ah .... es klickte, wenn ich Markos antworte –

Antwort

47

Beim Blick in die Umsetzung (ReferencePipeline.java) wir die Methode [link]

@Override 
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) { 
    do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink)); 
} 

die für findFirst Betrieb aufrufen wird, zu sehen. Das Besondere ist die sink.cancellationRequested(), mit der die Schleife bei der ersten Übereinstimmung beendet werden kann. Vergleichen Sie auf [link]

@Override 
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) { 
    Objects.requireNonNull(mapper); 
    // We can do better than this, by polling cancellationRequested when stream is infinite 
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, 
           StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 
     @Override 
     Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { 
      return new Sink.ChainedReference<P_OUT, R>(sink) { 
       @Override 
       public void begin(long size) { 
        downstream.begin(-1); 
       } 

       @Override 
       public void accept(P_OUT u) { 
        try (Stream<? extends R> result = mapper.apply(u)) { 
         // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 
         if (result != null) 
          result.sequential().forEach(downstream); 
        } 
       } 
      }; 
     } 
    }; 
} 

Verfahren für ein Stück voran landet forEach auf dem Teilstrom Aufruf ohne jede Möglichkeit für eine frühere Beendigung und den Kommentar am Anfang der flatMap Methode teilt auch diese abwesende Funktion.

Da es sich um mehr als nur eine Optimierung etwas wie impliziert es, dass der Code einfach bricht, wenn der Teilstrom unendlich ist, hoffe ich, dass die Entwickler bald beweisen, dass sie „kann als dies besser tun“ ...


Um die Implikationen zu veranschaulichen, während Stream.iterate(0, i->i+1).findFirst() wie erwartet funktioniert, wird Stream.of("").flatMap(x->Stream.iterate(0, i->i+1)).findFirst() in einer Endlosschleife enden.

...

Zwischenoperationen einen neuen Stream zurück:

die Spezifikation betrifft, können die meisten davon in der

chapter “Stream operations and pipelines” of the package specification finden. Sie sind immer faul;

...

... Laziness ermöglicht auch die Vermeidung alle Daten, die Prüfung, wenn es nicht notwendig ist; Für Operationen wie "finde die erste Zeichenfolge, die länger als 1000 Zeichen ist" ist es nur notwendig, gerade genug Zeichenfolgen zu untersuchen, um eine zu finden, die die gewünschten Eigenschaften aufweist, ohne alle Zeichenfolgen zu untersuchen, die von der Quelle verfügbar sind. (Dieses Verhalten wird umso wichtiger, wenn der Eingangsstrom unendlich ist und nicht nur groß.)

...

Des Weiteren werden einige Operationen als Kurzschließen Operationen. Eine Zwischenoperation ist kurzgeschlossen, wenn sie, wenn sie mit einer unendlichen Eingabe dargestellt wird, als Ergebnis einen endlichen Strom erzeugen kann. Eine terminale Operation ist kurzgeschlossen, wenn sie, wenn sie mit einer unendlichen Eingabe dargestellt wird, in einer endlichen Zeit enden kann. Eine Kurzschlußoperation in der Pipeline ist eine notwendige, aber nicht ausreichende Bedingung für die Verarbeitung eines unendlichen Stroms, der normalerweise in endlicher Zeit endet.

Es ist klar, dass ein Kurzschlussbetrieb keine endliche Zeitbeendigung, z. Wenn ein Filter nicht mit einem Element übereinstimmt, kann die Verarbeitung nicht abgeschlossen werden, aber eine Implementierung, die keine Beendigung in endlicher Zeit durch einfaches Ignorieren der Kurzschlußart einer Operation unterstützt, liegt weit außerhalb der Spezifikation.

+22

Dies a Obwohl es wahr sein kann, dass die Spezifikation dieses Verhalten unterstützt, erwartet niemand, dass das Abrufen des ersten Elements eines unendlichen Streams einen StackOverflowError auslöst oder in einer Endlosschleife enden wird, egal ob es direkt von der Quelle des Pipeline oder aus einem verschachtelten Stream über eine Mapping-Funktion n. Dies sollte als Fehler gemeldet werden. –

+0

Dies ist eine interessante Analyse des Codes selbst. Aber haben Sie irgendwelche Gedanken darüber, ob dieses Verhalten aufgrund der API-Garantien erwartet werden sollte? Ich fand Ihre [Kommentar zu der anderen Antwort] (http://stackoverflow.com/questions/29229373/why-filter-after-flatmap-is-not-completely-lazy-in-java-streams#comment46667431_29229513) interessant und war Ich hoffe, Ihre Antwort würde auch eine allgemeinere Diskussion enthalten. –

+1

Seltsam, dass es in dieser Hinsicht nicht viel um das Internet gibt. Glauben Sie nicht, dass niemand das vor mir entdeckt hat. Ich habe versucht, ein kombinatorisches Problem zu lösen, bei dem ich hoffte, den gesamten Explorationsraum nicht durch Lazy-Stream-Verhalten zu berechnen. Dies scheint jedoch nicht möglich zu sein. Soll ich das als Bug ablegen? Ich werde versuchen, es dann in Scala zu lösen. –

16

Die Elemente des Eingabestroms werden nacheinander getrunken. Das erste Element, 1, wird durch die zwei flatMap s in den Strom -1, 0, 1, 0, 1, 2, 1, 2, 3 umgewandelt, so dass der gesamte Strom nur dem ersten Eingabeelement entspricht. Die verschachtelten Ströme werden durch die Pipeline eifrig materialisiert, dann abgeflacht und dann der Stufe filter zugeführt. Dies erklärt Ihre Ausgabe.

Das obige stammt nicht aus einer grundsätzlichen Einschränkung, aber es würde wahrscheinlich die Dinge viel komplizierter machen, um die volle Faulheit für verschachtelte Ströme zu bekommen. Ich vermute, es wäre eine noch größere Herausforderung, sie leistungsfähig zu machen. Zum Vergleich erhalten Clojures Lazy Seqs für jede solche Verschachtelungsebene eine weitere Wrapping-Schicht. Aufgrund dieser Konstruktion können die Operationen sogar mit StackOverflowError fehlschlagen, wenn die Verschachtelung extrem ausgeführt wird.

+2

@MarkoTopolnik, danke Eigentlich ist die Sorge von Holger eigentlich Grund meiner Überraschung. Bedeutet der zweite Fall, dass ich flatMap nicht für unendliche Streams benutzen kann? –

+0

Ja, ich wette, dass der verschachtelte Stream nicht unendlich sein kann. –

7

In Bezug auf Bruch mit unendlichen Sub-Streams wird das Verhalten von flatMap noch mehr überraschend, wenn man einen Intermediate (im Gegensatz zu Terminal) Kurzschluss-Vorgang wirft.

Während die folgenden Arbeiten wie erwartet, die unendliche Folge von ganzen Zahlen Ausdrucken

Stream.of("x").flatMap(_x -> Stream.iterate(1, i -> i + 1)).forEach(System.out::println); 

der folgende Code druckt nur die „1“, aber immer noch tut nicht beenden:

Stream.of("x").flatMap(_x -> Stream.iterate(1, i -> i + 1)).limit(1).forEach(System.out::println); 

Ich kann mir nicht vorstellen, die Spezifikation zu lesen, in der das kein Fehler war.

5

In meiner kostenlosen StreamEx Bibliothek habe ich die Kurzschlusskollektoren eingeführt. Beim Sammeln von sequentiellem Strom mit kurzgeschlossenem Kollektor (wie MoreCollectors.first()) wird genau ein Element von der Quelle verbraucht. Intern wird es auf ziemlich schmutzige Weise implementiert: Verwenden einer benutzerdefinierten Ausnahme, um den Kontrollfluss zu unterbrechen.Mit meiner Bibliothek Ihrer Probe auf diese Weise neu geschrieben werden:

System.out.println(
     "Result: " + 
       StreamEx.of(1, 2, 3) 
       .flatMap(i -> Stream.of(i - 1, i, i + 1)) 
       .flatMap(i -> Stream.of(i - 1, i, i + 1)) 
       .filter(i -> { 
        System.out.println(i); 
        return true; 
       }) 
       .collect(MoreCollectors.first()) 
       .get() 
     ); 

Das Ergebnis ist folgendes:

-1 
Result: -1 
0

Ich stimme mit anderen Menschen dabei um einen Fehler bei JDK-8075939 geöffnet ist. Und da ist es immer noch nicht mehr als ein Jahr später. Ich möchte Ihnen empfehlen: AbacusUtil

N.println("Result: " + Stream.of(1, 2, 3).peek(N::println).first().get()); 

N.println("-----------"); 

N.println("Result: " + Stream.of(1, 2, 3) 
         .flatMap(i -> Stream.of(i - 1, i, i + 1)) 
         .flatMap(i -> Stream.of(i - 1, i, i + 1)) 
         .peek(N::println).first().get()); 

// output: 
// 1 
// Result: 1 
// ----------- 
// -1 
// Result: -1 

Disclosure: Ich bin der Entwickler AbacusUtil.