2015-03-13 6 views
8

Ich benutze kafka 0.8.1.1 auf einer Red Hat VM mit kafka-net plugin. Wie kann ich meinen Kunden so konfigurieren, dass er keine früheren Nachrichten von kafka mehr empfängt?Konfiguriere kafka-net um das Senden der letzten Nachrichten zu beenden

Mein Verbraucher Code:

var options = new KafkaOptions(new Uri("tcp://199.53.249.150:9092"), new Uri("tcp://199.53.249.151:9092")); 

Stopwatch sp = new Stopwatch(); 
var router = new BrokerRouter(options); 
var consumer = new Consumer(new ConsumerOptions("Test", router)); 

ThreadStart start2 =() => 
{ 
    while (true) 
    { 
     sp.Start(); 
     foreach (var message in consumer.Consume()) 
     { 
      if (MessageDecoderReceiver.MessageBase(message.Value) != null) 
      { 
       PrintMessage(MessageDecoderReceiver.MessageBase(message.Value).ToString()); 
      } 
      else 
      { 
       Console.WriteLine(message.Value); 
      } 
     } 
     sp.Stop(); 
    } 
}; 
var thread2 = new Thread(start2); 
thread2.Start(); 
+0

Ich habe ein Tag für die verwendete Sprache hinzugefügt und den Text und den Titel optimiert, um die Lesbarkeit zu verbessern. Ich habe auch [einen Titel aus dem Titel entfernt] (http://meta.stackexchange.com/questions/19190/should-questions-include-tags-in-the-titles). –

Antwort

11

Der Verbraucher in Kafka-net Auto verfolgt derzeit nicht die Offsets verbraucht wird. Sie müssen die Offset-Verfolgung manuell implementieren.

die in Kafkas Version Offset So speichern 0.8.1:

var commit = new OffsetCommitRequest 
      { 
       ConsumerGroup = consumerGroup, 
       OffsetCommits = new List<OffsetCommit> 
          { 
           new OffsetCommit 
            { 
             PartitionId = partitionId, 
             Topic = IntegrationConfig.IntegrationTopic, 
             Offset = offset, 
             Metadata = metadata 
            } 
          } 
      }; 

var commitResponse = conn.Connection.SendAsync(commit).Result.FirstOrDefault(); 

den Verbraucher einzustellen, bei einem bestimmten Offset-Punkt zu starten importieren:

var offsets = consumer.GetTopicOffsetAsync(IntegrationConfig.IntegrationTopic).Result 
        .Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Max())).ToArray(); 

var consumer = new Consumer(new ConsumerOptions(IntegrationConfig.IntegrationTopic, router), offsets); 

Beachten Sie den obigen Code wird der Verbraucher festgelegt am Ende des Logs zu konsumieren, effektiv nur neue Nachrichten zu empfangen.

+0

Super, das ist was ich brauche, vielen Dank! –

+0

Könnten Sie etwas Licht in die Frage bringen, wie Sie den Offset, den Sie begangen haben, verbrauchen? Wenn GetTopicOffsetAsync des Clients abgerufen wird, werden die festgeschriebenen Offsets nicht zurückgegeben. Es gibt nur 2 Werte zurück: 0 (vermutlich der erste) und der letzte Offset. Auch für andere Leser, die Verbindung der KafkaConnectionFactory Methode KafkaOptions mit bekommen: '_options.KafkaConnectionFactory.Create (uri, _options.ResponseTimeoutMs, _options.Log)' – arviman

+0

Nvm ich durch die Quelle gegraben und fand die OffsetFetchRequest \ Antwortklasse. Es sieht also so aus, als müssten wir alle Partitionen mit GetTopicOffset holen und dann den gespeicherten Offset für jede Partition mit dem OffsetFetchRequest abrufen, dessen Antwort dann in ConsumerOptions an den Consumer übergeben wird. – arviman