6

Ich habe einige ernste Probleme, eine Lösung für meine Bedürfnisse zu implementieren, in Bezug auf KafkaConsumer (> = 0.9).Kafka Consumer - Umfrageverhalten

Stellen wir uns vor, ich habe eine Funktion, die nur n Nachrichten von einem Kafka-Thema lesen muss.

Zum Beispiel: getMsgs(5) ->ruft die nächsten 5 kafka Nachrichten in Thema.

Also, ich habe eine Schleife, die wie folgt aussieht:

for (boolean exit= false;!exit;) 
{ 
    Records = consumer.poll(200); 
    for (Record r:records) { 
     processRecord(r); //do my things 
     numMss++; 
     if (numMss==maximum) //maximum=5 
      exit=true; 
    } 
} 

dies unter Berücksichtigung, ist das Problem, dass die Umfrage() -Methode mehr als 5 Meldungen bekommen kann. Zum Beispiel, wenn es 10 Nachrichten bekommt, wird mein Code für immer diese anderen 5 Nachrichten vergessen, da Kafka denkt, dass sie bereits verbraucht sind.

Ich habe versucht, den Offset commiting aber scheint nicht zu funktionieren:

consumer.commitSync(Collections.singletonMap(partition, 
    new OffsetAndMetadata(record.offset() + 1))); 

Selbst, mit der Offset-Konfiguration, wenn ich wieder den Verbraucher zu starten, es vom 6. Nachricht nicht (starten erinnere mich, ich wollte nur 5 Nachrichten), aber von der 11. (seit der ersten Umfrage verbraucht 10 Nachrichten).

Gibt es eine Lösung dafür, oder vielleicht (am sichersten) fehlt mir etwas?

Vielen Dank im Voraus!

Antwort

3

Sie können festlegen, max.poll.records zu was immer du magst, so dass du bei jeder Umfrage höchstens so viele Datensätze bekommst.

Für Ihren Anwendungsfall, den Sie in diesem Problem angegeben haben, müssen Sie keine expliziten Offsets explizit festlegen. Sie können einfach enable.auto.commit auf true setzen und auto.offset.reset auf earliest so einstellen, dass es eintritt, wenn es keinen Verbraucher gibt group.id (mit anderen Worten wenn Sie anfangen, von einer Partition zum allerersten Mal zu lesen). Sobald Sie eine Gruppe haben.ID und einige in Kafka gespeicherte Konsumenten-Offsets und falls Ihr Kafka-Consumer-Prozess abbricht, wird mit dem letzten festgeschriebenen Offset fortgefahren, da dies das Standardverhalten ist. Wenn ein Verbraucher startet, sucht er zuerst nach bestimmten Offsets und wenn ja, wird fortgesetzt von der letzten festgesetzten Offset und auto.offset.resetwird nicht kick in.

0

set auto.offset.reset Eigenschaft als "neueste". Versuchen Sie dann, konsumieren, Sie werden die konsumierten Datensätze aus dem Commit-Offset erhalten.

Oder Sie verwenden consumer.seek (TopicPartition, Offset) API vor Abfrage.

+0

auto.offset.reset sollte frühestens und es tritt nur ein, wenn es keine Verbraucher group.id gibt. Ohne Gruppen-ID kann man keine Offsets speichern. Wenn es bereits eine Kundengruppen-ID gibt, wird auto.offset.reset nichts tun, und standardmäßig wählt der Verbraucher den letzten festgeschriebenen Offset aus. – user1870400

0

Hatten Sie die automatische Festschreibung deaktiviert, indem Sie enable.auto.commit auf false gesetzt haben. Sie müssen das deaktivieren, wenn Sie den Offset manuell festschreiben möchten. Ohne diesen nächsten Aufruf von poll() wird automatisch der letzte Offset der Nachrichten, die Sie von der vorherigen Umfrage erhalten haben, übergeben().

0

Ab Kafka 0.9 haben sich die Parameternamen auto.offset.reset geändert;

Was tun, wenn es keine Anfänge in Kafka oder wenn der Strom auf dem Server nicht mehr existiert nicht Offset Offset ist (zB weil die Daten gelöscht wurden):

earliest: automatically reset the offset to the earliest offset 

latest: automatically reset the offset to the latest offset 

none: throw exception to the consumer if no previous offset is found for the consumer's group 

anything else: throw exception to the consumer.