2015-06-17 5 views
20

Ich bin ein neuer Student, der Kafka studiert, und ich bin auf einige grundsätzliche Probleme mit dem Verständnis mehrerer Verbraucher gestoßen, dass Artikel, Dokumentationen usw. bisher nicht sehr hilfreich waren.Wie verwende ich mehrere Konsumenten in Kafka?

Eine Sache, die ich versucht habe, ist, meinen eigenen Kafka-Produzenten und -Konsumenten auf hohem Niveau zu schreiben und sie gleichzeitig zu betreiben, 100 einfache Nachrichten zu einem Thema zu veröffentlichen und meinen Kunden diese abrufen zu lassen. Ich habe es erfolgreich geschafft, aber wenn ich versuche, einen zweiten Konsumenten einzuführen, der von demselben Thema konsumiert, auf dem gerade Nachrichten veröffentlicht wurden, erhält er keine Nachrichten.

Es war mein Verständnis, dass Sie für jedes Thema, Verbraucher aus separaten Verbrauchergruppen haben könnte und jede dieser Verbrauchergruppen eine vollständige Kopie der Nachrichten zu einem Thema erhalten würde. Ist das richtig? Wenn nicht, was wäre der richtige Weg für mich, mehrere Verbraucher zu gründen? Dies ist die Consumer-Klasse, die ich bisher geschrieben habe:

public class AlternateConsumer extends Thread { 
    private final KafkaConsumer<Integer, String> consumer; 
    private final String topic; 
    private final Boolean isAsync = false; 

    public AlternateConsumer(String topic, String consumerGroup) { 
     Properties properties = new Properties(); 
     properties.put("bootstrap.servers", "localhost:9092"); 
     properties.put("group.id", consumerGroup); 
     properties.put("partition.assignment.strategy", "roundrobin"); 
     properties.put("enable.auto.commit", "true"); 
     properties.put("auto.commit.interval.ms", "1000"); 
     properties.put("session.timeout.ms", "30000"); 
     properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); 
     properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     consumer = new KafkaConsumer<Integer, String>(properties); 
     consumer.subscribe(topic); 
     this.topic = topic; 
    } 


    public void run() { 
     while (true) { 
      ConsumerRecords<Integer, String> records = consumer.poll(0); 
      for (ConsumerRecord<Integer, String> record : records) { 
       System.out.println("We received message: " + record.value() + " from topic: " + record.topic()); 
      } 
     } 

    } 
} 

Außerdem bemerkte ich, dass ursprünglich war ich für ein Thema ‚Test‘ mit nur einer einzigen Partition des obigen Verbrauch zu testen. Als ich einer bestehenden Verbrauchergruppe einen weiteren Verbraucher hinzufügte, sagen wir "testGroup", löste dies eine Kafka-Neuverteilung aus, die die Latenz meines Verbrauchs um einen beträchtlichen Betrag in der Größenordnung von Sekunden verlangsamte. Ich dachte, dass dies ein Problem mit dem Rebalancing ist, da ich nur eine einzige Partition hatte, aber als ich ein neues Thema 'multiplepartitions' mit sage 6 Partitionen erstellte, traten ähnliche Probleme auf, wenn das Hinzufügen von mehr Konsumenten zu derselben Consumer-Gruppe Latenzprobleme verursachte. Ich habe mich umgeschaut und die Leute sagen mir, ich sollte einen Multithread-Konsumenten benutzen - kann irgendjemand das aufklären?

+0

Es gibt ein großartiges Beispiel für einen High-End-Consumer [hier] (https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Beispiel) für Kafka '0.8.1'. – chrsblck

+0

@chrsblck danke für den Link.Ich habe das schon einmal untersucht und wahrscheinlich nicht so gut verstanden, wie ich es könnte - könnten Sie vielleicht ein wenig erklären, wie dieses Beispiel die Threads nutzt? Ich verstehe nicht ganz, was sie gerade machen. –

+0

Eine Möglichkeit besteht darin, die gleiche Anzahl von Threads wie Partitionen für ein bestimmtes Thema zu haben. Aus dem Artikel - Schnappen Sie sich eine Liste von Streams 'List > streams = consumerMap.get (topic);' ... Dann weisen Sie jedem Thread eine Partition 'executor.submit (new ConsumerTest (stream, threadNumber)) '. – chrsblck

Antwort

17

Ich denke, Ihr Problem liegt in der Eigenschaft auto.offset.reset. Wenn ein neuer Kunde von einer Partition liest und es keinen zuvor festgeschriebenen Offset gibt, wird mit der Eigenschaft auto.offset.reset entschieden, wie der Startoffset aussehen soll. Wenn Sie es auf "Größt" (Standard) setzen, beginnen Sie mit dem Lesen der letzten (letzten) Nachricht. Wenn Sie es auf "Kleinste" setzen, erhalten Sie die erste verfügbare Nachricht.

So fügen:

properties.put("auto.offset.reset", "smallest"); 

und versuchen Sie es erneut.

+1

Dies ist eine späte Antwort, aber danke Chris! Ihre Lösungen sind korrekt und nach genauerer Betrachtung einiger Dokumentationen sollte ich bemerkt haben, dass beim Start eines neuen Verbrauchers nur die neuesten gesendeten Nachrichten verwendet werden - NICHT bereits existierende, außer wenn die oben genannten Eigenschaften eingestellt sind. –

4

In der Dokumentation here heißt es: "Wenn Sie mehr Threads zur Verfügung stellen, als es Partitionen zum Thema gibt, werden einige Threads nie eine Nachricht sehen". Können Sie Ihrem Thema Partitionen hinzufügen? Ich habe meine Consumer-Gruppen-Thread-Anzahl gleich der Anzahl der Partitionen in meinem Thema, und jeder Thread erhält Nachrichten.

Hier ist mein Thema config:

buffalo-macbook10:kafka_2.10-0.8.2.1 aakture$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic recent-wins 
Topic:recent-wins PartitionCount:3 ReplicationFactor:1 Configs: 
Topic: recent-wins Partition: 0 Leader: 0 Replicas: 0 Isr: 0 
Topic: recent-wins Partition: 1 Leader: 0 Replicas: 0 Isr: 0 
Topic: recent-wins Partition: 2 Leader: 0 Replicas: 0 Isr: 0 

Und mein Verbraucher:

package com.cie.dispatcher.services; 

import com.cie.dispatcher.model.WinNotification; 
import com.fasterxml.jackson.databind.ObjectMapper; 
import com.google.inject.Inject; 
import io.dropwizard.lifecycle.Managed; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

/** 
* This will create three threads, assign them to a "group" and listen for notifications on a topic. 
* Current setup is to have three partitions in Kafka, so we need a thread per partition (as recommended by 
* the kafka folks). This implements the dropwizard Managed interface, so it can be started and stopped by the 
* lifecycle manager in dropwizard. 
* <p/> 
* Created by aakture on 6/15/15. 
*/ 
public class KafkaTopicListener implements Managed { 
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicListener.class); 
private final ConsumerConnector consumer; 
private final String topic; 
private ExecutorService executor; 
private int threadCount; 
private WinNotificationWorkflow winNotificationWorkflow; 
private ObjectMapper objectMapper; 

@Inject 
public KafkaTopicListener(String a_zookeeper, 
          String a_groupId, String a_topic, 
          int threadCount, 
          WinNotificationWorkflow winNotificationWorkflow, 
          ObjectMapper objectMapper) { 
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
      createConsumerConfig(a_zookeeper, a_groupId)); 
    this.topic = a_topic; 
    this.threadCount = threadCount; 
    this.winNotificationWorkflow = winNotificationWorkflow; 
    this.objectMapper = objectMapper; 
} 

/** 
* Creates the config for a connection 
* 
* @param zookeeper the host:port for zookeeper, "localhost:2181" for example. 
* @param groupId the group id to use for the consumer group. Can be anything, it's used by kafka to organize the consumer threads. 
* @return the config props 
*/ 
private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) { 
    Properties props = new Properties(); 
    props.put("zookeeper.connect", zookeeper); 
    props.put("group.id", groupId); 
    props.put("zookeeper.session.timeout.ms", "400"); 
    props.put("zookeeper.sync.time.ms", "200"); 
    props.put("auto.commit.interval.ms", "1000"); 

    return new ConsumerConfig(props); 
} 

public void stop() { 
    if (consumer != null) consumer.shutdown(); 
    if (executor != null) executor.shutdown(); 
    try { 
     if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { 
      LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); 
     } 
    } catch (InterruptedException e) { 
     LOG.info("Interrupted during shutdown, exiting uncleanly"); 
    } 
    LOG.info("{} shutdown successfully", this.getClass().getName()); 
} 
/** 
* Starts the listener 
*/ 
public void start() { 
    Map<String, Integer> topicCountMap = new HashMap<>(); 
    topicCountMap.put(topic, new Integer(threadCount)); 
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 
    executor = Executors.newFixedThreadPool(threadCount); 
    int threadNumber = 0; 
    for (final KafkaStream stream : streams) { 
     executor.submit(new ListenerThread(stream, threadNumber)); 
     threadNumber++; 
    } 
} 

private class ListenerThread implements Runnable { 
    private KafkaStream m_stream; 
    private int m_threadNumber; 

    public ListenerThread(KafkaStream a_stream, int a_threadNumber) { 
     m_threadNumber = a_threadNumber; 
     m_stream = a_stream; 
    } 

    public void run() { 
     try { 
      String message = null; 
      LOG.info("started listener thread: {}", m_threadNumber); 
      ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 
      while (it.hasNext()) { 
       try { 
        message = new String(it.next().message()); 
        LOG.info("receive message by " + m_threadNumber + " : " + message); 
        WinNotification winNotification = objectMapper.readValue(message, WinNotification.class); 
        winNotificationWorkflow.process(winNotification); 
       } catch (Exception ex) { 
        LOG.error("error processing queue for message: " + message, ex); 
       } 
      } 
      LOG.info("Shutting down listener thread: " + m_threadNumber); 
     } catch (Exception ex) { 
      LOG.error("error:", ex); 
     } 
    } 
    } 
} 
+0

Können Sie bitte das Beispiel für die Version Kafka 1.0 teilen, da die meisten der im obigen Beispiel verwendeten Klassen veraltet sind. –

+0

Ich glaube nicht, dass es zu der Zeit draußen war, ich werde vielleicht nicht bald meinen Code aktualisieren, Entschuldigung. –

4

Wenn Sie mehr Verbraucher wollen gleiche Nachrichten zu konsumieren (wie eine Broadcast), können Sie sie mit verschiedener Verbrauchergruppe laichen Außerdem wird auto.offset.reset in der Consumer-Konfiguration auf den kleinsten Wert gesetzt. Wenn Sie möchten, dass mehrere Verbraucher parallel verbrauchen (teilen Sie die Arbeit unter ihnen), sollten Sie Anzahl der Partitionen> = Anzahl der Verbraucher erstellen. Eine Partition kann nur von höchstens einem Verbraucherprozess verbraucht werden. Ein Consumer kann jedoch mehr als eine Partition beanspruchen.