2016-03-31 5 views
0

Mir ist Kafkas Aussage bekannt, dass KafkaConsumer nicht Thread-sicher ist.Wie kann ich ConcurrentModificationException Fehler in Kafka beheben? (0.9.0.1)

Also tat ich dies: (Scala)

val m = Map(new TopicPartition(msg.topic(), msg.partition()) -> new OffsetAndMetadata(msg.offset())) 
consumer.synchronized{ consumer.commitSync(m) } 

ich meinen Zugang zum Verbraucher in einem synchronisierten Block setzen, aber ich habe noch ConcurrentModificationException Fehler auf der Linie mit consumer.commitSync (m) erhalten.

Warum und was kann ich dagegen tun?

Ich benutze Akka-Streams, also gibt es bestimmt Threads-Geheimnisse unter der Decke, aber sollte nicht der synchronisierte Block dafür sorgen?

+0

Ich vermute, Sie haben mehrere Instanzen von 'Consumer', so dass die Synchronisation nicht funktioniert, wie Sie' synchronisierte' auf verschiedenen Objekten aufrufen ... Sie benötigen ein global gemeinsam genutztes Objekt (dh eine einzelne Instanz eines Dummy 'Objekts') dass jeder Consumer-Thread verwendet wird, um die Sperre zu erhalten. –

Antwort

0

Ein Ansatz in der Dokumentation besteht darin, einen separaten Thread nur für den KafkaConsumer zu erstellen und mit der Außenwelt mit einer Art gleichzeitiger Warteschlange zu kommunizieren.

+0

Dieser Ansatz hat sehr gut funktioniert! – Greg