2015-06-09 9 views
15

Ich versuche zu verstehen, ob es eine Möglichkeit gibt, die Reduktionsoperation zu beenden, ohne den gesamten Strom zu untersuchen, und ich kann keinen Weg finden.Java 8: Unterbrechen der Reduktionsoperation von der Untersuchung aller Stream-Elemente

Der Anwendungsfall ist ungefähr wie folgt: Lassen Sie eine lange Liste von Integer s, die in eine Accumulator gefaltet werden muss. Jede Elementprüfung ist potentiell teuer, also führe ich innerhalb der Accumulator eine Überprüfung der eingehenden Accumulator durch, um zu sehen, ob wir sogar teure Operationen durchführen müssen - wenn nicht, dann gebe ich einfach den Akkumulator zurück.

Dies ist offensichtlich eine gute Lösung für kleine (er) Listen, aber riesige Listen verursachen unnötige Kosten für den Besuch von Stromelementen, die ich vermeiden möchte.

Hier ist eine Codeskizze - nehmen Sie nur serielle Reduktionen an.

class Accumulator { 
    private final Set<A> setA = new HashSet<>; 
    private final Set<B> setB = new HashSet<>; 
} 

class ResultSupplier implements Supplier<Result> { 

    private final List<Integer> ids; 

    @Override 
    public Result get() { 
     Accumulator acc = ids.stream().reduce(new Accumulator(), f(), (x, y) -> null); 

     return (acc.setA.size > 1) ? Result.invalid() : Result.valid(acc.setB); 
    } 

    private static BiFunction<Accumulator, Integer, Accumulator> f() { 
     return (acc, element) -> { 
      if (acc.setA.size() <= 1) { 
       // perform expensive ops and accumulate results 
      } 
      return acc; 
     }; 
    } 
} 

Neben mit der ganzen Stream, gibt es eine andere Tatsache Ich mag nicht zu durchqueren - ich zweimal den gleichen Zustand überprüfen (nämlich setA Größe Prüfung).

Ich habe map() und collect() Operationen in Betracht gezogen, aber sie schienen nur mehr von der gleichen und nicht finden, sie ändern wesentlich die Tatsache, dass ich nur die Falte-Operation beenden kann, ohne den gesamten Stream zu untersuchen.

Darüber hinaus ist mein Denken, dass imaginäre takeWhile(p : (A) => boolean) Stream API-Korrespondent uns auch nichts kaufen würde, da die Abschlussbedingung auf den Akku, nicht Stream-Elemente per se abhängt.

Denken Sie daran, ich bin ein relativer Neuling in FP so - gibt es eine Möglichkeit, dies zu tun, wie ich es erwarte? Habe ich das ganze Problem falsch aufgesetzt oder ist das eine Einschränkung?

+2

Ihre gesamte Verwendung von 'reduce' ist falsch, da Sie einen veränderbaren Container verwenden. Sie sollten 'collect' dafür verwenden. – Holger

+0

Wie gesagt, ich habe das auch versucht - ich habe meinen'Akkumulator' einen 'IntConsumer' und später einen vollwertigen' Collector' gemacht, aber ich konnte keinen Weg sehen, die Vollstromuntersuchung zu unterbrechen. Kannst du weiter darauf hinweisen, was zu tun ist? – quantum

+0

Verwandte (vielleicht sogar duplizieren, aber nicht streng): http://stackoverflow.com/questions/20746429/java-8-limit-infinite-stream-by-a-predicate – Marco13

Antwort

6

Statt mit ids.stream() des Startens können Sie

  1. Verwendung ids.spliterator()
  2. Wrap resultierende spliterator in benutzerdefinierte spliterator, die ein flüchtiges boolean Flag hat
  3. die tryAdvance Rückkehr des benutzerdefinierten spliterator haben false, wenn das Flag geändert wird
  4. Verwandeln Sie Ihren benutzerdefinierten Spliterator in einen Stream mit StreamSupport.stream(Spliterator<T>, boolean)
  5. weiterhin Stream-Pipeline nach wie vor
  6. fahren Sie den Strom nach unten durch den Booleschen Makeln, wenn der Akkumulator

einige statische Hilfsmethoden hinzufügen voll ist es funktionsfähig zu halten.

die resultierende API über dieses

Accumulator acc = terminateableStream(ids, (stream, terminator) -> 
    stream.reduce(new Accumulator(terminator), f(), (x, y) -> null)); 

Zusätzlich aussehen könnte, ist mein Denken, dass imaginäre Takewhile (p: (A) => boolean) Stream API Korrespondent würde uns auch nichts kaufen

Es funktioniert, wenn die Bedingung vom Akkumulatorstatus und nicht von den Stream-Mitgliedern abhängig ist. Das ist im Wesentlichen der Ansatz, den ich oben skizziert habe.

Es wäre wahrscheinlich in einer takeWhile von der JDK verboten, aber eine benutzerdefinierte Implementierung mit Spliteratoren ist frei, eine Stateful-Ansatz zu nehmen.

+0

Hm, das ist ein interessanter Vorschlag - es klingt ein bisschen zu aufwendig und low-level dafür, aber ich werde es versuchen. – quantum

5

Natürlich wird es eine interessante, reine FP-Antwort geben, die dazu beitragen könnte, dieses Problem so zu lösen, wie Sie es beabsichtigen.

In der Zwischenzeit, warum FP überhaupt verwenden, wenn die einfache Lösung pragmatisch zwingend ist und Ihre ursprüngliche Datenquelle ist sowieso eine List, die bereits vollständig materialisiert ist, und Sie werden serielle Reduktion, nicht parallele Reduktion verwenden. Schreiben Sie stattdessen:

@Override 
public Result get() { 
    Accumulator acc = new Accumulator(); 

    for (Integer id : ids) { 
     if (acc.setA.size() <= 1) { 
      // perform expensive ops and accumulate results 
     } 

     // Easy: 
     if (enough) 
      break; 
    } 

    return (acc.setA.size > 1) ? Result.invalid() : Result.valid(acc.setB); 
} 
+2

Was ist die reine FP-Antwort? :) – ZhongYu

+0

@ bayou.io: Jemand anderes kann das finden. Ich bin zu faul und pragmatisch :) ... obwohl ich bleibe und lerne, ob diese Antwort auftauchen wird. –

+0

Einverstanden und so war der Code vorher strukturiert, aber ich möchte das in reiner FP-Mode lösen. Ich sehe einfach nicht, wie ich das erreichen kann. – quantum

3

Wie in den Kommentaren erwähnt: Das Einsatzszenario klingt ein wenig zweifelhaft. Einerseits wegen der Verwendung von reduce anstelle von collect, andererseits aufgrund der Tatsache, dass der Zustand, der zum Stoppen der Reduktion verwendet werden sollte, auch im Akkumulator erscheint. Es klingt wie einfach Begrenzen der Strom auf eine bestimmte Anzahl von Elementen oder basierend auf einer Bedingung, wie in another question gezeigt, kann hier passender sein.

Natürlich kann es in der realen Anwendung sein, dass die Bedingung tatsächlich nicht mit der Anzahl der Elemente zusammenhängt, die verarbeitet wurden. Für diesen Fall skizzierte ich hier eine Lösung, die im Wesentlichen der answer by the8472 entspricht und der Lösung aus der oben genannten Frage sehr ähnlich ist: Sie verwendet eine Stream, die aus einer Spliterator erstellt wird, die einfach an die ursprüngliche Spliterator delegiert, es sei denn das Anhalten Bedingung ist erfüllt.

import java.util.ArrayList; 
import java.util.Collections; 
import java.util.HashSet; 
import java.util.List; 
import java.util.Set; 
import java.util.Spliterator; 
import java.util.Spliterators; 
import java.util.function.BiFunction; 
import java.util.function.Consumer; 
import java.util.function.Supplier; 
import java.util.stream.Stream; 
import java.util.stream.StreamSupport; 

public class StopStreamReduction 
{ 
    public static void main(String[] args) 
    { 
     ResultSupplier r = new ResultSupplier(); 
     System.out.println(r.get()); 
    } 
} 

class Accumulator 
{ 
    final Set<Integer> set = new HashSet<Integer>(); 
} 

class ResultSupplier implements Supplier<String> 
{ 
    private final List<Integer> ids; 
    ResultSupplier() 
    { 
     ids = new ArrayList<Integer>(Collections.nCopies(20, 1)); 
    } 

    public String get() 
    { 
     //return getOriginal(); 
     return getStopping(); 
    } 

    private String getOriginal() 
    { 
     Accumulator acc = 
      ids.stream().reduce(new Accumulator(), f(), (x, y) -> null); 
     return (acc.set.size() > 11) ? "invalid" : String.valueOf(acc.set); 
    } 

    private String getStopping() 
    { 
     Spliterator<Integer> originalSpliterator = ids.spliterator(); 
     Accumulator accumulator = new Accumulator(); 
     Spliterator<Integer> stoppingSpliterator = 
      new Spliterators.AbstractSpliterator<Integer>(
       originalSpliterator.estimateSize(), 0) 
      { 
       @Override 
       public boolean tryAdvance(Consumer<? super Integer> action) 
       { 
        return accumulator.set.size() > 10 ? false : 
         originalSpliterator.tryAdvance(action); 
       } 
      }; 
     Stream<Integer> stream = 
      StreamSupport.stream(stoppingSpliterator, false); 
     Accumulator acc = 
      stream.reduce(accumulator, f(), (x, y) -> null); 
     return (acc.set.size() > 11) ? "invalid" : String.valueOf(acc.set); 
    } 

    private static int counter = 0; 
    private static BiFunction<Accumulator, Integer, Accumulator> f() 
    { 
     return (acc, element) -> { 

      System.out.print("Step " + counter); 
      if (acc.set.size() <= 10) 
      { 
       System.out.print(" expensive"); 
       acc.set.add(counter); 
      } 
      System.out.println(); 
      counter++; 
      return acc; 
     }; 
    } 
} 

bearbeiten als Reaktion auf die Kommentare:

Natürlich ist es möglich, es "funktioneller" zu schreiben. Aufgrund der vagen Beschreibungen in den Fragen und des eher "skizzenhaften" Codebeispiels ist es jedoch schwierig, hier die "geeignetste" Lösung zu finden. (Und "angemessen" bezieht sich auf die spezifischen Vorbehalte der eigentlichen Aufgabe, und auf die Frage von how functional it should be ohne die Lesbarkeit zu opfern).

Mögliche Funktionalisierungsschritten könnte die Schaffung einer generischen StoppingSpliterator Klasse umfassen, die auf einem Delegierten arbeitet Spliterator und Supplier<Boolean> als Stoppzustand hat, und Fütterung dies mit einem Predicate auf dem tatsächlichen Accumulator, zusammen mit ein paar Gebrauchs Methoden und Verfahren Referenzen hier und da.

Aber noch einmal: Es ist fraglich, ob dies tatsächlich eine geeignete Lösung ist, oder ob man nicht vielmehr sollte die einfache und pragmatische Lösung von the answer by Lukas Eder verwenden ...

import java.util.ArrayList; 
import java.util.Collection; 
import java.util.Collections; 
import java.util.HashSet; 
import java.util.List; 
import java.util.Set; 
import java.util.Spliterator; 
import java.util.Spliterators; 
import java.util.function.BiFunction; 
import java.util.function.Consumer; 
import java.util.function.Predicate; 
import java.util.function.Supplier; 
import java.util.stream.StreamSupport; 

public class StopStreamReduction 
{ 
    public static void main(String[] args) 
    { 
     List<Integer> collection = 
      new ArrayList<Integer>(Collections.nCopies(20, 1)); 
     System.out.println(compute(collection)); 
    } 

    private static String compute(List<Integer> collection) 
    { 
     Predicate<Accumulator> stopCondition = (a) -> a.set.size() > 10; 
     Accumulator result = reduceStopping(collection, 
      new Accumulator(), StopStreamReduction::accumulate, stopCondition); 
     return (result.set.size() > 11) ? "invalid" : String.valueOf(result.set); 
    } 

    private static int counter; 
    private static Accumulator accumulate(Accumulator a, Integer element) 
    { 
     System.out.print("Step " + counter); 
     if (a.set.size() <= 10) 
     { 
      System.out.print(" expensive"); 
      a.set.add(counter); 
     } 
     System.out.println(); 
     counter++; 
     return a; 
    } 

    static <U, T> U reduceStopping(
     Collection<T> collection, U identity, 
     BiFunction<U, ? super T, U> accumulator, 
     Predicate<U> stopCondition) 
    { 
     // This assumes that the accumulator always returns 
     // the identity instance (with the accumulated values). 
     // This may not always be true! 
     return StreamSupport.stream(
      new StoppingSpliterator<T>(
       collection.spliterator(), 
       () -> stopCondition.test(identity)), false). 
        reduce(identity, accumulator, (x, y) -> null); 
    } 
} 

class Accumulator 
{ 
    final Set<Integer> set = new HashSet<Integer>(); 
} 

class StoppingSpliterator<T> extends Spliterators.AbstractSpliterator<T> 
{ 
    private final Spliterator<T> delegate; 
    private final Supplier<Boolean> stopCondition; 

    StoppingSpliterator(Spliterator<T> delegate, Supplier<Boolean> stopCondition) 
    { 
     super(delegate.estimateSize(), 0); 
     this.delegate = delegate; 
     this.stopCondition = stopCondition; 
    } 

    @Override 
    public boolean tryAdvance(Consumer<? super T> action) 
    { 
     if (stopCondition.get()) 
     { 
      return false; 
     } 
     return delegate.tryAdvance(action); 
    } 
} 
+0

Nur zur Information - Ihre Vermutung ist richtig: das Stoppen der Reduktion steht in keinem Zusammenhang mit der Anzahl der besuchten Stream-Elemente und kann in der Tat nicht a priori abgeleitet werden, ohne den Zustand "Akkumulator" zu prüfen. Genau deshalb hatte ich den Verdacht, dass mit meinem Lösungsdesign etwas nicht stimmte, und ich hoffte, dass jemand eine "mehr-FP" -Lösung skizzieren oder einen Hinweis geben würde. Auf jeden Fall, danke für deine Mühe und deine Codeskizze, werde ich es so schnell wie möglich ausprobieren und sehen, wie es aussieht. – quantum

+0

Das ist einige nette funktionale Programmierung genau dort. Jetzt verstehe ich, warum sie sagen, dass es so prägnant ist;) –

+2

Ich bin ziemlich sicher, dass es im Aussehen kürzer und funktioneller gemacht werden könnte. – the8472

0

Ich denke, es ist möglich, eine werfen RuntimeException des speziellen Typs von Ihrem benutzerdefinierten Kollektor (oder Operation zu reduzieren), die das Ergebnis innerhalb des Ausnahmeobjekts enthält und es außerhalb der collect Operation abwickeln, die das Ergebnis auspackt.Ich weiß, dass das Verwenden der Ausnahme für nicht-außergewöhnlichen Kontrollfluss nicht idiomatisch ist, aber es sollte in Ihrem Fall sogar für parallele Ströme funktionieren.

Tatsächlich gibt es viele Fälle, in denen eine Kurzschlussreduktion nützlich sein könnte. Sammeln Sie beispielsweise die Enum-Werte auf EnumSet (Sie können stoppen, sobald Sie feststellen, dass alle möglichen Enum-Werte bereits erfasst sind). Oder schneiden Sie alle Elemente von Stream<Set> (Sie können stoppen, wenn die resultierende Menge nach einem Schritt leer ist: Fortsetzung der Reduktion ist nutzlos). Intern gibt es ein SHORT_CIRCUIT Flag, das in Stream-Operationen wie findFirst verwendet wird, aber es ist nicht der öffentlichen API ausgesetzt.

+1

Wie sollten Wurf-Ausnahmen in parallelen Szenarien funktionieren? – Holger

+0

@Holger: Es wird von 'ForkJoinTask.join' erneut ausgelöst und andere Aufgaben werden so schnell wie möglich abgebrochen. Siehe diesen Proof-of-Concept [gist] (https://gist.github.com/amaembo/c0e24c7dbc9f1728e638). Es löst eine weitere beliebte Aufgabe: Überprüfen Sie, ob die Summe nichtnegativer Zahlen größer ist als das Limit (Sie können die Summierung stoppen, sobald das Limit erreicht ist). –

+1

"so schnell wie möglich" bedeutet nicht "sofort", so dass aktuell ausgeführte Akkumulatoren möglicherweise noch vervollständigt werden. Das ist in Ordnung, wenn Sie nur nach einer Bedingung suchen möchten, aber nicht, wenn Sie * ein Limit erzwingen möchten. Mit anderen Worten, es kann das Problem des OP nicht lösen. Aber es funktioniert auch nicht mit Ihrem Beispiel: alle Threads summieren sich zu einer lokalen Summe unter dem Limit und wenn der Überlauf im Combiner erkannt wird, ist die Arbeit aller Threads bereits abgeschlossen, so dass das Werfen nicht wirklich gespeichert wird etwas. – Holger

3

Es gibt keine echte FP-Lösung, einfach weil Ihr gesamter Akku nicht FP ist. Wir können Ihnen in dieser Hinsicht nicht helfen, da wir nicht wissen, was es tatsächlich macht. Alles, was wir sehen, ist, dass es sich auf zwei veränderbare Sammlungen stützt und daher nicht Teil einer reinen FP-Lösung sein kann.

Wenn Sie die Einschränkungen akzeptieren und dass es keine sauber Weg, um die Stream API verwenden Sie für die einfache Weise streben könnten. Der einfache Weg beinhaltet eine Stateful Predicate, die sich um nicht das Beste ist, aber manchmal nicht zu vermeiden:

public Result get() { 
    int limit = 1; 
    Set<A> setA=new HashSet<>(); 
    Set<B> setB=new HashSet<>(); 
    return ids.stream().anyMatch(i -> { 
     // perform expensive ops and accumulate results 
     return setA.size() > limit; 
    })? Result.invalid(): Result.valid(setB); 
} 

Aber ich mag, dass angesichts Ihre spezifische Logik beachten, das heißt, dass Ihr Ergebnis ungültig betrachtet wird, wenn der Satz auch wächst groß, Ihr Versuch der Verarbeitung nicht zu viel Elemente ist eine Optimierung des fehlerhaften Falles. Sie sollten keine Mühe verschwenden, dies zu optimieren. Wenn ein gültiges Ergebnis das Ergebnis der Verarbeitung aller Elemente ist, dann verarbeite alle Elemente ...

1

Ich stimme allen vorherigen Antworten zu. Sie tun es falsch, indem Sie eine Reduzierung auf einen veränderbaren Akku erzwingen. Der Prozess, den Sie beschreiben, kann auch nicht als eine Pipeline von Transformationen und Reduktionen ausgedrückt werden.

Wenn Sie wirklich, wirklich müssen es tun, FP-Stil, würde ich tun, wie @ the8472 weist darauf hin.

Wie dem auch sei, gebe ich Ihnen eine neue kompaktere Alternative, ähnlich wie @ lukas-eder-Lösung unter Verwendung eines Iterator:

Function<Integer, Integer> costlyComputation = Function.identity(); 

Accumulator acc = new Accumulator(); 

Iterator<Integer> ids = Arrays.asList(1, 2, 3).iterator(); 

while (!acc.hasEnough() && ids.hasNext()) 
    costlyComputation.andThen(acc::add).apply(ids.next()); 

Sie haben zwei unterschiedliche Anliegen FP bezüglich hier:

Wie stop iterating

Da Sie auf veränderlichen Zustand angewiesen sind, wird FP nur Ihr Leben härter machen. Sie können die Sammlung extern iterieren oder einen Iterator verwenden, wie ich es vorschlage.

Verwenden Sie dann ein if(), um die Iteration zu stoppen.

Sie können verschiedene Strategien haben, aber am Ende des Tages, das ist, was Sie verwenden.

Ich bevorzuge den Iterator, weil idiomatischer ist (drückt Ihre Absicht in diesem Fall besser aus).

Wie Akkumulator und kostspieliger Vorgang für mich

Dies ist das interessanteste zu entwerfen.

Eine reine Funktion kann keinen Zustand haben, muss etwas empfangen und muss etwas zurückgeben, und immer dasselbe für die gleiche Eingabe (wie eine mathematische Funktion). Können Sie Ihre kostspielige Operation so ausdrücken?

Benötigt es einen gemeinsamen Status mit dem Akkumulator? Vielleicht gehört das Teilen keinem von beiden.

Umwandeln Sie Ihre Eingabe und fügen Sie sie dann in den Akkumulator ein, oder liegt das in der Verantwortung des Akkumulators? Macht es Sinn, die Funktion in den Accumulator zu injizieren?