2016-04-02 20 views
0

Dies ist meine Verbraucher-Code überprüfen:Wie Verbraucherdaten mit Kafka Java api

public class KafkaConsumer { 

    private ConsumerConnector consumerConnector = null; 
    private final String topic = "JsonTopic"; 

    public void initialize() { 
      Properties props = new Properties(); 
      props.put("zookeeper.connect", "localhost:2181"); 
      props.put("group.id", "testgroup"); 
      props.put("kafka.topic", "JsonTopic"); 
      props.put("zookeeper.session.timeout.ms", "400"); 
      props.put("zookeeper.sync.time.ms", "300"); 
      props.put("auto.commit.interval.ms", "1000"); 

      ConsumerConfig conConfig = new ConsumerConfig(props); 
      consumerConnector = Consumer.createJavaConsumerConnector(conConfig); 
    } 

    public void consume() throws IOException { 
      Map<String, Integer> topicCount = new HashMap<String, Integer>();  
      topicCount.put(topic, new Integer(1)); 

      //ConsumerConnector creates the message stream for each topic 
      Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =consumerConnector.createMessageStreams(topicCount);   

      List<KafkaStream<byte[], byte[]>> kStreamList = consumerStreams.get(topic); 

      for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) { 


       ConsumerIterator<byte[], byte[]> consumerItem = kStreams.iterator(); 

       while (consumerItem.hasNext()) 
       { 
        System.out.println("Message consumed from topic[" + topic + "] : " + new String(consumerItem.next().message())); 
        // writeToFile(new String(consumerItem.next().message()),"/root/abc.txt"); 
       } 

      } 
      //Shutdown the consumer connector 
      if (consumerConnector != null) 
       consumerConnector.shutdown();   
    } 

Server starten:

./kafka-server-start.sh /usr/hdp/2.3.0.0-2557/etc/kafka/conf.default/server.properties 

starten prodcuer:

[[email protected] bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic JsonTopic 
HI 
Hello 

ich nicht bekommen jede Ausgabe in der Konsole.

[[email protected] ~]# java -jar kafkaconsumer.jar 
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). 
log4j:WARN Please initialize the log4j system properly. 

Antwort

1

standardmäßig, wenn ein Verbraucher gestartet wird, es ignoriert alle vorhandenen Daten in dem Thema Kafka und wird nur neue Nachricht verbrauchen kommt in nachdem der Verbraucher gestartet. Basierend auf Ihrer Beschreibung der Reihenfolge der Ereignisse haben Sie Ihren Konsolenhersteller gestartet, Nachrichten geschrieben und den Kafka-Benutzer gestartet. Somit wird keine Nachricht aufgenommen. Sie können versuchen, einige Nachrichten zu senden, nachdem der Verbraucher gestartet wurde. Alternativ können Sie den Consumer konfigurieren, indem Sie "auto.offset.reset" auf "früheste" für den neuen Consumer in 0.9 und "kleinste" für den alten Consumer festlegen. Dadurch wird sichergestellt, dass der Konsument Nachrichten bereits im Kafka-Thema abholt, bevor er startet.

+0

Dies ist nicht die Lösung – Aman