2016-06-21 13 views
2

Ich benutze kafka-Knoten, um Nachrichten von einem bestimmten Kafka-Thema zu konsumieren. Wenn ich meinen Node-Server neu starte, wird mein Consumer wie erwartet gestartet, aber das Standardverhalten ist, dass ich ab Offset 0 konsumiere, während mein Ziel ist, nur neue Nachrichten zu erhalten (aka Startkonsum vom aktuellen Offset). Ich habe keinen Weg gefunden, dies über die API-Dokumentation zu erreichen. Weiß jemand, ob es unterstützt wird?Kafka-Knoten Start Verbrauch von letzten Offset

Danke!

Antwort

4

fragte ich diese Frage in kafka-Knoten Github Fragen (link) und bekam eine Antwort. Es ist jetzt verfügbar (ab v0.4.0). Das folgende Snippet hat für mich funktioniert:

consumerClient = new kafka.Client('localhost:2181'); 

/* Print latest offset. */ 
var offset = new kafka.Offset(consumerClient); 

offset.fetch([{ topic: 'myTopic', partition: 0, time: -1 }], function (err, data) { 
     var latestOffset = data['myTopic']['0'][0]; 
     console.log("Consumer current offset: " + latestOffset); 
}); 

var consumer = new kafka.HighLevelConsumer(
     consumerClient, 
     [ 
      { topic: 'myTopic', partition: 0, fromOffset: -1 } 
     ], 
     { 
      autoCommit: false 
     } 
); 

Prost!

-1

Wenn Sie will nur neue Nachrichten erhalten, müssen Sie die folgende Eigenschaft vor der Erstellung von Verbraucher Instanz setzen: auto.offset.reset = neueste

+0

Wo soll ich das machen? – ItayB

+0

/* Bevor Sie KafkaConsumer Instance erstellen, müssen Sie die Eigenschaft festlegen. */ props.setProperty ("auto.offset.reset", "latest");/* frühestes, spätestes */ KafkaConsumer Verbraucher = neuer KafkaConsumer <> (Stützen); – Hussain

+0

Sind Sie sicher, dass Sie über die JavaScript-API (node ​​js) sprechen? Sieht aus wie C++ – ItayB