Ich habe einige ernste Probleme, eine Lösung für meine Bedürfnisse zu implementieren, in Bezug auf KafkaConsumer (> = 0.9).Kafka Consumer - Umfrageverhalten
Stellen wir uns vor, ich habe eine Funktion, die nur n Nachrichten von einem Kafka-Thema lesen muss.
Zum Beispiel: getMsgs(5)
->ruft die nächsten 5 kafka Nachrichten in Thema.
Also, ich habe eine Schleife, die wie folgt aussieht:
for (boolean exit= false;!exit;)
{
Records = consumer.poll(200);
for (Record r:records) {
processRecord(r); //do my things
numMss++;
if (numMss==maximum) //maximum=5
exit=true;
}
}
dies unter Berücksichtigung, ist das Problem, dass die Umfrage() -Methode mehr als 5 Meldungen bekommen kann. Zum Beispiel, wenn es 10 Nachrichten bekommt, wird mein Code für immer diese anderen 5 Nachrichten vergessen, da Kafka denkt, dass sie bereits verbraucht sind.
Ich habe versucht, den Offset commiting aber scheint nicht zu funktionieren:
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(record.offset() + 1)));
Selbst, mit der Offset-Konfiguration, wenn ich wieder den Verbraucher zu starten, es vom 6. Nachricht nicht (starten erinnere mich, ich wollte nur 5 Nachrichten), aber von der 11. (seit der ersten Umfrage verbraucht 10 Nachrichten).
Gibt es eine Lösung dafür, oder vielleicht (am sichersten) fehlt mir etwas?
Vielen Dank im Voraus!
auto.offset.reset sollte frühestens und es tritt nur ein, wenn es keine Verbraucher group.id gibt. Ohne Gruppen-ID kann man keine Offsets speichern. Wenn es bereits eine Kundengruppen-ID gibt, wird auto.offset.reset nichts tun, und standardmäßig wählt der Verbraucher den letzten festgeschriebenen Offset aus. – user1870400