2016-05-09 18 views
0

Dieser Code gibt mir alle Nachrichten von Anfang an und für eine andere Nachricht warten und manchmal ist es einfach für eine andere Nachricht wartetWie bekomme ich alle Nachrichten von kafka topic und zähle sie mit Java? manchmal

import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import kafka.message.MessageAndMetadata; 

public class TestConsumer{ 

public static void main(String[] args) { 
    ConsumerConfig config; 
    Properties props = new Properties(); 
    props.put("zookeeper.connect","sandbox.hortonworks.com:2181"); 
    props.put("group.id", "group-4"); 
    props.put("zookeeper.session.timeout.ms", "400"); 
    props.put("zookeeper.sync.time.ms", "200"); 
    props.put("auto.commit.interval.ms", "200"); 
    config = new ConsumerConfig(props); 
    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector 
      (config); 
    String topic = "News"; 
    System.out.println("Running"); 
    Run(consumer,topic); 
} 

public static void Run(ConsumerConnector consumer,String topic){ 
    HashMap<String,Integer> topicCountMap = 
      new HashMap<String,Integer>(); 
    topicCountMap.put(topic, 1); 
    Map<String,List<KafkaStream<byte[],byte[]>>> 
    consumerMap = consumer.createMessageStreams(topicCountMap); 
    KafkaStream<byte[],byte[]> stream = consumerMap.get(topic).get(0); 
    ConsumerIterator<byte[],byte[]> it = stream.iterator(); 
    List<String> msgTopicList = new ArrayList<String>(); 
    int count = 0; 
    System.out.println("Waiting"); 
    while(it.hasNext()){ 
     MessageAndMetadata<byte[],byte[]> msgAndData = it.next(); 
     String msg = new String(msgAndData.message()); 
     msgTopicList.add(msg); 
     String key = "NoKey"; 
     System.out.println(msg); 
     count++; 
    } 
} 
} 

Was ich tun müssen, ist erhalten alle Nachrichten aus dem Thema sie die gesendeten Benutzer und zählen sie

Was ist der beste Weg, dies zu tun?

Version kafka_2.10-0.8.1.2.2.4.2-2

+0

Haben Sie immer von Anfang des Themas lesen? Wenn ja, sollten Sie props.put ("auto.offset.reset", "smallest") verwenden; und Auto-Commit deaktivieren: props.put ("enable.auto.commit", false); –

Antwort

0

Hier ist dein Beispiel.

Das wichtigste hier ist Kafka Verbraucher Konfigurationseigenschaften:

von Anfang der Warteschlange beginnen.

props.put("auto.offset.reset", "smallest"); 

Speichert keine Offsets für diesen Verbraucher.

props.put("auto.commit.enable", "false"); 

Warten Sie 5 Sekunden auf Nachrichten, wenn keine weiteren Nachrichten verfügbar sind.

props.put("consumer.timeout.ms", "5000"); 

Das ganze Beispiel:

package com.xxx.yyy.zzz; 

import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 

import kafka.consumer.Consumer; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerTimeoutException; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 

public class KafkaConsumer { 
    private final ConsumerConnector consumer; 
    private final String topic; 
    private int count = 0; 

    public KafkaConsumer(final String zookeeper, final String groupId, final String topic) { 
     this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(zookeeper, groupId)); 
     this.topic = topic; 
    } 

    // Initialize connection properties to Kafka and Zookeeper 
    private static ConsumerConfig createConsumerConfig(final String zookeeper, final String groupId) { 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", zookeeper); 
     props.put("group.id", groupId); 
     props.put("zookeeper.session.timeout.ms", "2000"); 
     props.put("zookeeper.sync.time.ms", "250"); 
     props.put("auto.commit.interval.ms", "1000"); 
     props.put("auto.offset.reset", "smallest"); 
     props.put("auto.commit.enable", "false"); 
     props.put("consumer.timeout.ms", "5000"); 

     return new ConsumerConfig(props); 
    } 

    private void getData() { 
     List<byte[]> msgs = new ArrayList(); 
     Map<String, Integer> topicMap = new HashMap<>(); 

     // Define single thread for topic 
     topicMap.put(topic, 1); 
     try { 
      Map<String, List<KafkaStream<byte[], byte[]>>> listMap = consumer.createMessageStreams(topicMap); 
      List<KafkaStream<byte[], byte[]>> kafkaStreams = listMap.get(topic); 

      // Collect the messages. 
      kafkaStreams.forEach(ks -> ks.forEach(mam -> msgs.add(mam.message()))); 

     } catch (ConsumerTimeoutException exception) { 
      // There no more messages available -> so, we are done. 

      // Now print all your messages 
      msgs.forEach(System.out::println); 

      // count them 
      count = msgs.size(); 
     } finally { 
      if (consumer != null) { 
       consumer.shutdown(); 
      } 
     } 
    } 
}