2015-05-22 1 views
21

Sortierung habe ich eine Record Klasse:Encounter Bestellung falsch, wenn ein Parallelstrom

public class Record implements Comparable<Record> 
{ 
    private String myCategory1; 
    private int myCategory2; 
    private String myCategory3; 
    private String myCategory4; 
    private int myValue1; 
    private double myValue2; 

    public Record(String category1, int category2, String category3, String category4, 
     int value1, double value2) 
    { 
     myCategory1 = category1; 
     myCategory2 = category2; 
     myCategory3 = category3; 
     myCategory4 = category4; 
     myValue1 = value1; 
     myValue2 = value2; 
    } 

    // Getters here 
} 

ich eine große Liste von vielen Datensätzen. Nur die zweiten und fünften Werte, i/10000 und i, werden später von den Gettern getCategory2() bzw. getValue1() verwendet.

List<Record> list = new ArrayList<>(); 
for (int i = 0; i < 115000; i++) 
{ 
    list.add(new Record("A", i/10000, "B", "C", i, (double) i/100 + 1)); 
} 

Beachten Sie, dass zunächst 10.000 Datensätze haben eine category2 von 0, dann neben 10.000 haben 1 usw., während die value1 Werte 0-114999 sequentiell sind.

Ich erstelle ein Stream das ist parallel und sorted.

Stream<Record> stream = list.stream() 
    .parallel() 
    .sorted(
     //(r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2()) 
    ) 
    //.parallel() 
; 

Ich habe ein ForkJoinPool die 8 Fäden hält, was die Anzahl der Kerne, die ich auf meinem PC haben.

ForkJoinPool pool = new ForkJoinPool(8); 

Ich benutze den Trick described here to submit a stream processing task to my own ForkJoinPool instead of the common ForkJoinPool.

List<Record> output = pool.submit(() -> 
    stream.collect(Collectors.toList() 
)).get(); 

I erwartet, dass die parallel sorted Betrieb die Begegnung Reihenfolge des Stromes respektieren würde, und daß es eine stabile sortieren, weil die durch SpliteratorArrayList zurück ORDERED ist.

Allerdings zeigt einfacher Code, der die Elemente der resultierenden Listoutput in der Reihenfolge ausgibt, dass es nicht ganz der Fall ist.

for (Record record : output) 
{ 
    System.out.println(record.getValue1()); 
} 

Ausgang, kondensiert:

0 
1 
2 
3 
... 
69996 
69997 
69998 
69999 
71875 // discontinuity! 
71876 
71877 
71878 
... 
79058 
79059 
79060 
79061 
70000 // discontinuity! 
70001 
70002 
70003 
... 
71871 
71872 
71873 
71874 
79062 // discontinuity! 
79063 
79064 
79065 
79066 
... 
114996 
114997 
114998 
114999 

Die size() von output ist 115000, und alle Elemente erscheinen, dort zu sein, nur in einer etwas anderen Reihenfolge.

Also schrieb ich einen Prüfcode, um zu sehen, ob die sort stabil war. Wenn es stabil ist, sollten alle Werte in der Reihenfolge bleiben. Dieser Code überprüft die Reihenfolge und gibt Abweichungen aus.

int prev = -1; 
boolean verified = true; 
for (Record record : output) 
{ 
    int curr = record.getValue1(); 
    if (prev != -1) 
    { 
     if (prev + 1 != curr) 
     { 
      System.out.println("Warning: " + prev + " followed by " + curr + "!"); 
      verified = false; 
     } 
    } 
    prev = curr; 
} 
System.out.println("Verified: " + verified); 

Ausgang:

Warning: 69999 followed by 71875! 
Warning: 79061 followed by 70000! 
Warning: 71874 followed by 79062! 
Warning: 99999 followed by 100625! 
Warning: 107811 followed by 100000! 
Warning: 100624 followed by 107812! 
Verified: false 

Dieser Zustand hält, wenn ich einen der folgenden Schritte aus:

  • die ForkJoinPool Ersetzen mit einem ThreadPoolExecutor.

    ThreadPoolExecutor pool = new ThreadPoolExecutor(8, 8, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)); 
    
  • die häufig ForkJoinPool durch die Stream direkt verarbeitet.

    List<Record> output = stream.collect(Collectors.toList()); 
    
  • Anruf parallel()nach nenne ich sorted.

    Stream<Record> stream = list.stream().sorted().parallel(); 
    
  • Anruf parallelStream() statt stream().parallel().

    Stream<Record> stream = list.parallelStream().sorted(); 
    
  • Sortieren ein Comparator verwenden. Beachten Sie, dass dieses Sortierkriterium anders ist als die "natürliche" Reihenfolge, die ich für die Schnittstelle Comparable definiert habe, obwohl das Ergebnis, beginnend mit den bereits von Anfang an geordneten Ergebnissen, immer noch dasselbe sein sollte.

    Stream<Record> stream = list.stream().parallel().sorted(
        (r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2()) 
    ); 
    

Ich kann dies nur bekommen die Begegnung, um zu bewahren, wenn ich ein tun nicht der folgenden auf dem Stream:

  • nicht parallel() rufen Sie.
  • Keine Überlastung von sorted anrufen.

Interessanterweise hat die parallel() ohne eine Art die Reihenfolge erhalten.

In beiden oben genannten Fällen ist die Ausgabe:

Verified: true 

Meine Java-Version 1.8.0_05 ist. Diese Anomalie auch occurs on Ideone, die Java 8u25 zu laufen scheint.

aktualisiert

Ich habe meinen JDK auf die neueste Version zum Zeitpunkt des Schreibens aufgerüstet, 1.8.0_45, und das Problem ist unverändert.

Frage

Ist der Datensatz, um in den resultierenden List (output) aus der Ordnung, weil die Art irgendwie nicht stabil ist, weil die Begegnung, um nicht erhalten wird, oder aus einem anderen Grunde?

Wie kann ich sicherstellen, dass die Reihenfolge der Begegnungen erhalten bleibt, wenn ich einen parallelen Stream erzeuge und ihn sortiere?

+6

Ich würde versuchen, das einfachste Programm zu machen, das das Problem reproduziert, es auf der neuesten JDK-Version läuft, und einen Bug einreicht, wenn es reproduziert wird: Die Sorte soll stabil sein: sie ist als solche dokumentiert. –

Antwort

11

Es sieht aus wie Arrays.parallelSort ist unter bestimmten Umständen nicht stabil. Gut beobachtet. Die parallele Stream-Sortierung wird in Form von Arrays.parallelSort implementiert, so dass auch Streams betroffen sind. Hier ist ein vereinfachtes Beispiel:

public class StableSortBug { 
    static final int SIZE = 50_000; 

    static class Record implements Comparable<Record> { 
     final int sortVal; 
     final int seqNum; 

     Record(int i1, int i2) { sortVal = i1; seqNum = i2; } 

     @Override 
     public int compareTo(Record other) { 
      return Integer.compare(this.sortVal, other.sortVal); 
     } 
    } 

    static Record[] genArray() { 
     Record[] array = new Record[SIZE]; 
     Arrays.setAll(array, i -> new Record(i/10_000, i)); 
     return array; 
    } 

    static boolean verify(Record[] array) { 
     return IntStream.range(1, array.length) 
         .allMatch(i -> array[i-1].seqNum + 1 == array[i].seqNum); 
    } 

    public static void main(String[] args) { 
     Record[] array = genArray(); 
     System.out.println(verify(array)); 
     Arrays.sort(array); 
     System.out.println(verify(array)); 
     Arrays.parallelSort(array); 
     System.out.println(verify(array)); 
    } 
} 

Auf meinem Rechner (2 Kern x 2 Threads) Druckt die folgenden:

true 
true 
false 

Natürlich es soll true dreimal drucken. Dies ist auf dem aktuellen JDK 9 Dev Builds.Ich würde nicht überrascht sein, wenn es in allen bisherigen JDK 8 Veröffentlichungen vorkommt, wenn man bedenkt, was Sie versucht haben. Kurioserweise wird sich das Verhalten ändern, wenn Sie die Größe oder den Teiler reduzieren. Eine Größe von 20.000 und ein Teiler von 10.000 ist stabil, und eine Größe von 50.000 und ein Teiler von 1.000 ist ebenfalls stabil. Es scheint, als ob das Problem mit einer ausreichend großen Folge von Werten zu tun hat, die gleiche gegenüber der parallelen Split-Größe vergleichen.

Das OpenJDK-Problem JDK-8076446 deckt diesen Fehler ab.

+4

Es gibt auch https://bugs.openjdk.java.net/browse/JDK-8076446 –

+0

(wahr, wahr, falsch) auch auf Windows7 (64), 8u40. – edharned

+2

@StefanZobel Oh ja, danke, ich habe den neuen Fehler als Duplikat des alten geschlossen. –