Ich verwende einen benutzerdefinierten Kafka Deserializer, der ein Objekt aus JSON marshallt.Gibt es eine Möglichkeit, unbegrenzte Wiederholungen für benutzerdefinierte Kafka Deserializer zu stoppen?
val props = Map(
"bootstrap.servers" -> kafkaQueue.kafkaHost,
"group.id" -> adapterServer.adapterConfig.kafkaGroup,
"enable.auto.commit" -> "false",
"max.poll.records" -> "1",
"auto.offset.reset" -> "earliest",
"auto.commit.interval.ms" -> "1000",
"key.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"value.deserializer" -> "com.mystuff.CustomJSONDeserializer"
)
new KafkaConsumer[Array[Byte], MyMessage](props)
Eine Sache, die ich gesehen habe, ist, dass, wenn jemand schlecht JSON zum Thema postet, Kafka versucht es mit meinen benutzerdefinierten Deserializer deserialisieren - und kann es nicht. Der CustomJSONDeserializer löst eine Ausnahme aus, aber Kafka versucht es einfach weiter.
So dreht es sich einfach unendlich versuchen, die schlechte JSON-Version deserialisieren, im Wesentlichen stecken bleiben. Da dies alles innerhalb von Kafka geschieht, bin ich mir nicht sicher, wie ich es stoppen und ihm sagen kann, dass es mit der nächsten Nachricht weitergehen soll.
Wie kann ich dies vermeiden?