Ich bin ein neuer Student, der Kafka studiert, und ich bin auf einige grundsätzliche Probleme mit dem Verständnis mehrerer Verbraucher gestoßen, dass Artikel, Dokumentationen usw. bisher nicht sehr hilfreich waren.Wie verwende ich mehrere Konsumenten in Kafka?
Eine Sache, die ich versucht habe, ist, meinen eigenen Kafka-Produzenten und -Konsumenten auf hohem Niveau zu schreiben und sie gleichzeitig zu betreiben, 100 einfache Nachrichten zu einem Thema zu veröffentlichen und meinen Kunden diese abrufen zu lassen. Ich habe es erfolgreich geschafft, aber wenn ich versuche, einen zweiten Konsumenten einzuführen, der von demselben Thema konsumiert, auf dem gerade Nachrichten veröffentlicht wurden, erhält er keine Nachrichten.
Es war mein Verständnis, dass Sie für jedes Thema, Verbraucher aus separaten Verbrauchergruppen haben könnte und jede dieser Verbrauchergruppen eine vollständige Kopie der Nachrichten zu einem Thema erhalten würde. Ist das richtig? Wenn nicht, was wäre der richtige Weg für mich, mehrere Verbraucher zu gründen? Dies ist die Consumer-Klasse, die ich bisher geschrieben habe:
public class AlternateConsumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final Boolean isAsync = false;
public AlternateConsumer(String topic, String consumerGroup) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", consumerGroup);
properties.put("partition.assignment.strategy", "roundrobin");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer, String>(properties);
consumer.subscribe(topic);
this.topic = topic;
}
public void run() {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(0);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
}
}
}
}
Außerdem bemerkte ich, dass ursprünglich war ich für ein Thema ‚Test‘ mit nur einer einzigen Partition des obigen Verbrauch zu testen. Als ich einer bestehenden Verbrauchergruppe einen weiteren Verbraucher hinzufügte, sagen wir "testGroup", löste dies eine Kafka-Neuverteilung aus, die die Latenz meines Verbrauchs um einen beträchtlichen Betrag in der Größenordnung von Sekunden verlangsamte. Ich dachte, dass dies ein Problem mit dem Rebalancing ist, da ich nur eine einzige Partition hatte, aber als ich ein neues Thema 'multiplepartitions' mit sage 6 Partitionen erstellte, traten ähnliche Probleme auf, wenn das Hinzufügen von mehr Konsumenten zu derselben Consumer-Gruppe Latenzprobleme verursachte. Ich habe mich umgeschaut und die Leute sagen mir, ich sollte einen Multithread-Konsumenten benutzen - kann irgendjemand das aufklären?
Es gibt ein großartiges Beispiel für einen High-End-Consumer [hier] (https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Beispiel) für Kafka '0.8.1'. – chrsblck
@chrsblck danke für den Link.Ich habe das schon einmal untersucht und wahrscheinlich nicht so gut verstanden, wie ich es könnte - könnten Sie vielleicht ein wenig erklären, wie dieses Beispiel die Threads nutzt? Ich verstehe nicht ganz, was sie gerade machen. –
Eine Möglichkeit besteht darin, die gleiche Anzahl von Threads wie Partitionen für ein bestimmtes Thema zu haben. Aus dem Artikel - Schnappen Sie sich eine Liste von Streams 'List> streams = consumerMap.get (topic);' ... Dann weisen Sie jedem Thread eine Partition 'executor.submit (new ConsumerTest (stream, threadNumber)) '. –
chrsblck