2016-04-24 19 views
14

Mein Hazelcast-basiertes Programm kann in zwei Modi arbeiten: Übergeber und Arbeiter.Weird Hazelcat IMap # Put() Verhalten

Einreicher setzt einige POJO der verteilten Karte durch eine Taste, z.B .: hazelcastInstance.getMap(MAP_NAME).put(key, value);

Arbeiter hat eine Endlosschleife (mit Thread.sleep(1000L); innen für timeout), die Einheiten aus der Karte verarbeiten müssen. Momentan drucke ich nur die Kartengröße in dieser Schleife.

Jetzt ist hier das Problem. Ich starte die Worker App. Dann beginne ich vier Übergeber gleichzeitig (jeder fügt einen Eintrag zur Karte hinzu und beendet seine Arbeit). Aber nachdem alle Übermittler-Apps fertig sind, druckt die Worker-App eine beliebige Größe: manchmal erkennt sie, dass nur ein Eintrag hinzugefügt wurde, manchmal zwei, manchmal sogar drei (tatsächlich hat er nie alle vier Einträge gesehen).

Was ist das Problem mit diesem einfachen Fluss? Ich habe in Hazelcast Dokumentation gelesen, dass put() Methode ist synchron, so dass es garantiert, dass, nachdem es zurückgibt, wird Eintrag auf verteilte Karte platziert und wird repliziert. Aber es scheint nicht so in meinem Experiment.

UPD (Code)

Einreicher:

public void submit(String key) { 
    Object mySerializableObject = ... 
    IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME); 
    map.putIfAbsent(key, mySerializableObject, TASK_TTL_IN_HOURS, TimeUnit.HOURS); 
} 

Worker:

public void process() { 
    while (true) { 
     IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME); 
     System.out.println(map.size()); 

     // Optional<Map.Entry<String, Object>> objectToProcess = getObjectToProcess(); 
     // objectToProcess.ifPresent(objectToProcess-> processObject(id, objectToProcess)); 
     try { 
      Thread.sleep(PAUSE); 
     } catch (InterruptedException e) { 
      LOGGER.error(e.getMessage(), e); 
     } 
    } 
} 

bemerkte ich aus "Verarbeitung" Teil selbst, denn jetzt bin ich nur zu bekommen versuchen, konsistenter Zustand der Karte Der obige Code gibt jedes Mal verschiedene Ergebnisse aus, z. B .: "4, 3, 1, 1, 1, 1, 1 ..." (so dass 4 Aufgaben für einen Moment angezeigt werden können, aber dann ... verschwinden) .

UPD (log)

Worker:

... 
tasksMap.size() = 0 
tasksMap.size() = 0 
tasksMap.size() = 0 
tasksMap.size() = 0 
tasksMap.size() = 1 
tasksMap.size() = 2 
tasksMap.size() = 2 
tasksMap.size() = 2 
tasksMap.size() = 2 
tasksMap.size() = 2 
... 

Einreicher 1:

Before: tasksMap.size() = 0 
After: tasksMap.size() = 1 

Einreicher 2:

Before: tasksMap.size() = 1 
After: tasksMap.size() = 4 

submi tter 3:

Before: tasksMap.size() = 1 
After: tasksMap.size() = 2 

Submitter 4:

Before: tasksMap.size() = 3 
After: tasksMap.size() = 4 
+0

Die IMap :: size-Methode ist eine Schätzung, sollte sich aber irgendwann stabilisieren. Können Sie etwas mehr Code teilen? – noctarius

+0

@noctarius, ich habe die Frage aktualisiert. –

+0

Besteht der Code aus eingebetteten Elementen und werden sie tatsächlich nach dem Senden des Werts angehalten? Ich könnte mir vorstellen, dass die Mitglieder den Cluster schnell verlassen, um die Backup-Anforderungen auf Urlaub zu erfüllen. – noctarius

Antwort

7

Nun, ich denke, ich habe das Problem gelöst. Soweit ich verstehe, verteilt IMap von hazelcastInstance.getMap zurückgegeben wird, garantiert nicht, dass Daten über alle vorhandenen Knoten im Cluster repliziert werden: einige Teile der Daten können auf einige Knoten repliziert werden, ein anderer Teil - auf einen anderen Knoten. Aus diesem Grund wurden in meinem Beispiel einige der übertragenen Aufgaben nicht auf den Worker-Knoten repliziert (der fortwährend funktioniert), sondern auf einige andere übergebende Benutzer, die ihre Ausführung nach der Übergabe beenden. Solche Einträge gingen also beim Absenden des Absenders verloren.

Ich löste dieses Problem, indem ich hazelcastInstance.getMap zu hazelcastInstance.getReplicatedMap ersetzte.Diese Methode gibt ReplicatedMap zurück, wobei AFAIK garantiert, dass die darin platzierten Einträge in alle Knoten des Clusters repliziert werden. Jetzt funktioniert alles in meinem System.