2016-06-24 16 views
22

ich eine benutzerdefinierte Java 8 Sammler zu schreiben, die angeblich den Durchschnitt einer POJO zu berechnen, die eine getValue() Methode hat. Hier ist der Code:Java 8 Stream Combiner genannt nie

public static Collector<BoltAggregationData, BigDecimal[], BigDecimal> avgCollector = new Collector<BoltAggregationData, BigDecimal[], BigDecimal>() { 

     @Override 
     public Supplier<BigDecimal[]> supplier() { 
      return() -> { 
       BigDecimal[] start = new BigDecimal[2]; 
       start[0] = BigDecimal.ZERO; 
       start[1] = BigDecimal.ZERO; 
       return start; 
      }; 
     } 

     @Override 
     public BiConsumer<BigDecimal[], BoltAggregationData> accumulator() { 
      return (a,b) -> { 
       a[0] = a[0].add(b.getValue()); 
       a[1] = a[1].add(BigDecimal.ONE); 
      }; 
     } 

     @Override 
     public BinaryOperator<BigDecimal[]> combiner() { 
      return (a,b) -> { 
       a[0] = a[0].add(b[0]); 
       a[1] = a[1].add(b[1]); 
       return a; 
      }; 
     } 

     @Override 
     public Function<BigDecimal[], BigDecimal> finisher() { 
      return (a) -> { 
       return a[0].divide(a[1], 6 , RoundingMode.HALF_UP); 
      }; 
     } 

     private final Set<Characteristics> CHARACTERISTICS = new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED)); 

     @Override 
     public Set<Characteristics> characteristics() { 
      return CHARACTERISTICS; 
     } 

    }; 

Es funktioniert alles gut im nicht parallelen Fall. Wenn ich jedoch eine parallelStream() verwende, funktioniert es manchmal nicht. Wenn man beispielsweise die Werte von 1 bis 10 berechnet, berechnet es (53/9 statt 55/10). Beim Debuggen trifft der Debugger niemals den Breakpoint in der Funktion combiner(). Gibt es eine Art Flagge, die ich setzen muss?

+0

Ich habe beide upvote, vielen Dank für Ihre Antwort auch :) Ich habe gerade die andere Antwort irgendwie klarer. Danke auch für den Tipp zum EnumSet. –

+1

Das ist ok, ich habe gerade bemerkt, dass Sie akzeptiert (oder versucht zu akzeptieren) in einem kurzen Zeitintervall beide, so wollte ich nur eine mögliche Verwirrung löschen. – Holger

+0

Hinweis gibt es viel bessere Möglichkeiten, dies zu tun, zum Beispiel eines [kumulativen gleitenden Durchschnitt] (https://en.wikipedia.org/wiki/Moving_average). –

Antwort

22

Es ist wie das Problem aussieht, ist die CONCURRENT Eigenschaft, die etwas anderes tut, als Sie es vielleicht denken würde:

Zeigt an, dass dieser Kollektor gleichzeitige ist, dass der Behälter Ergebnis bedeutet den Speicher unterstützen kann Funktion, die gleichzeitig mit dem gleichen Ergebnis Behälter aus mehreren Threads aufgerufen.

Statt des Kombinierers Aufruf, der Akkumulator gleichzeitig aufgerufen wird, die gleiche für alle Threads BigDecimal[] a verwenden. Der Zugriff auf a ist nicht atomar, so dass es schief geht:

Thread1 -> retrieves value of a[0]: 3 
Thread2 -> retrieves value of a[0]: 3 
Thread1 -> adds own value: 3 + 3 = 6 
Thread2 -> adds own value: 3 + 4 = 7 
Thread1 -> writes 6 to a[0] 
Thread2 -> writes 7 to a[0] 

macht den Wert von a[0] 7, wenn es 10. Die gleiche Art der Sache sein sollte, mit a[1] passieren kann, so dass die Ergebnisse widersprüchlich sein können.


Wenn Sie die CONCURRENT Merkmal entfernen, wird der Kombinierer statt gewöhnen.

+0

Wäre es zu tun, anstatt eine 'AtomicInteger' (oder' AtomicLong') anstelle eines 'BigInteger' zu benutzen? Könnte das 'CONCURRENT'-Merkmal dann verwendet werden? – dcsohl

+2

@dcsohl Während des Tests fand ich, dass die mit den Leitungen in dem Akkumulator mit '' 'synchronisiert (this) {...}' '' auch das Problem gelöst, umgibt. Aber meine Intuition besagt, dass die Verwendung dieses Merkmals nicht erzwungen werden sollte, sondern vielmehr verwendet werden soll, wenn der Ergebnis-Container gleichzeitige Operationen auf beliebige Weise unterstützt. –

+1

@dcsohl: natürlich, so dass die Akku-Funktion Thread-sicher kann das Problem lösen, wie das ist, was das 'CONCURRENT' Merkmal bedeutet, dass diese Funktion Thread ist sicher. Es wird jedoch auch ein Leistungsvorteil der gleichzeitigen Bewertung gegenüber lokaler Akkumulation und Zusammenführung vorgeschlagen, was hier nicht der Fall ist (dies ist selten der Fall). – Holger

18

Nun, das ist genau das, was Sie fordern, wenn Characteristics.CONCURRENT Angabe:

Zeigt an, dass dieser Kollektor gleichzeitige ist, was bedeutet, dass das Ergebnis Behälter den Akkumulator-Funktion aufgerufen, die gleichzeitig mit dem gleichen Ergebnis Behälter aus mehreren unterstützen Fäden.

Wenn das nicht der Fall ist, wie Sie mit Ihrem Collector, sollten Sie dieses Flag nicht angeben.


Als Randbemerkung, new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED)); ist ziemlich ineffizient für Merkmale angegeben werden. Sie können einfach EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED) verwenden. Wenn Sie die falsche gleichzeitige Merkmal entfernen, können Sie entweder EnumSet.of(Characteristics.UNORDERED) oder Collections.singleton(Characteristics.UNORDERED), verwenden aber einen HashSet definitiv viel des Guten ist.