Mir ist Kafkas Aussage bekannt, dass KafkaConsumer nicht Thread-sicher ist.Wie kann ich ConcurrentModificationException Fehler in Kafka beheben? (0.9.0.1)
Also tat ich dies: (Scala)
val m = Map(new TopicPartition(msg.topic(), msg.partition()) -> new OffsetAndMetadata(msg.offset()))
consumer.synchronized{ consumer.commitSync(m) }
ich meinen Zugang zum Verbraucher in einem synchronisierten Block setzen, aber ich habe noch ConcurrentModificationException Fehler auf der Linie mit consumer.commitSync (m) erhalten.
Warum und was kann ich dagegen tun?
Ich benutze Akka-Streams, also gibt es bestimmt Threads-Geheimnisse unter der Decke, aber sollte nicht der synchronisierte Block dafür sorgen?
Ich vermute, Sie haben mehrere Instanzen von 'Consumer', so dass die Synchronisation nicht funktioniert, wie Sie' synchronisierte' auf verschiedenen Objekten aufrufen ... Sie benötigen ein global gemeinsam genutztes Objekt (dh eine einzelne Instanz eines Dummy 'Objekts') dass jeder Consumer-Thread verwendet wird, um die Sperre zu erhalten. –