2016-07-29 11 views
0

Hier ist mein kafka Nachricht Produzent:Verbrauchen nur bestimmte partion Nachricht

 ProducerRecord producerRecord = new ProducerRecord(topic, "k1", message); 
     producer.send(producerRecord); 

hier ist mein Verbraucher

TopicPartition partition0 = new TopicPartition(topic, 0); 
consumer.assign(Arrays.asList(partition0)); 
    final int minBatchSize = 200; 
    List<ConsumerRecord<String, byte[]>> buffer = new ArrayList<>(); 
    while (true) { 
     ConsumerRecords<String, byte[]> records = consumer.poll(100); 
     for (ConsumerRecord<String, byte[]> record : records) { 
      buffer.add(record); 
      System.out.println(record.key() + "KEY: " + record.value()); 

Wie ist es möglich, nur topic Nachricht k1 als Trennwand Schlüssel zu verbrauchen

+0

Mit "Partition Key' k1' "denke ich, dass Sie eigentlich" Nachrichtenschlüssel 'k1'" meinen. Kafka wählt irgendwelche Schlüssel für Sie aus. Wenn Sie eine Partition konsumieren, müssen Sie jede Nachricht von dieser Partition konsumieren. Ob Sie etwas damit machen oder es einfach fallenlassen, wenn es nicht den richtigen Schlüssel hat, liegt ganz bei Ihnen. – Harald

+0

Sie sagen, dass beim Empfangen von Nachrichten ich tun soll 'if (msg.key(). Equals ("k1") ' – manish

+0

Ja, das ist der Weg zu gehen. – Harald

Antwort

1

Die einzige Möglichkeit, ein solches Verhalten zu implementieren, besteht darin, die Anzahl der Partitionen == Anzahl der möglichen Schlüssel zu haben und einen benutzerdefinierten Partitionierer zu verwalten Schlüsseleindeutigkeit für eine Partition (Standard-Hash-Partitionierer würde funktionieren, denke ich). Aber diese Lösung ist sehr weit von optimal und ich kann es nicht empfehlen. Darüber hinaus können Sie nicht in Mechanismus gebaut verwenden ein ähnliches Verhalten zu erreichen - Sie Nachrichten filtern müssen auf Client-Seite

+0

nicht Sie denke, dass diese Funktion von kafka nicht gut ist, zB ich poste eine Nachricht zu 'topic' und alle 'topic'-Subskribenten erhalten eine Nachricht, obwohl sie nicht dazu gedacht war, sie zu senden cos filtering sollte auf der Client-Seite basierend auf dem Nachrichtenschlüssel sein – manish

0

Ein Vorschlag ist die partition und offset Ihrer spezifischen Nachricht zu erinnern, und mit assign und seek , poll in Verbraucherseite. (Auch Verbraucher max.poll.records = 1, die eine Nachricht auf einmal abrufen).

  • zuweisen, bestimmte Partition zuweisen;
  • suchen, auf bestimmten Offset suchen, dann erhält nächste Umfrage Ihre erwartete Nachricht K1.

Hinweis: Es funktioniert wie "zufällige" Suche, wird aber die Nachrichtenverbrauchsleistung reduzieren.
0.10 neue Verbraucher und neue Konfiguration max.poll.records sind erforderlich.