Dieser Code gibt mir alle Nachrichten von Anfang an und für eine andere Nachricht warten und manchmal ist es einfach für eine andere Nachricht wartetWie bekomme ich alle Nachrichten von kafka topic und zähle sie mit Java? manchmal
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class TestConsumer{
public static void main(String[] args) {
ConsumerConfig config;
Properties props = new Properties();
props.put("zookeeper.connect","sandbox.hortonworks.com:2181");
props.put("group.id", "group-4");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "200");
config = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector
(config);
String topic = "News";
System.out.println("Running");
Run(consumer,topic);
}
public static void Run(ConsumerConnector consumer,String topic){
HashMap<String,Integer> topicCountMap =
new HashMap<String,Integer>();
topicCountMap.put(topic, 1);
Map<String,List<KafkaStream<byte[],byte[]>>>
consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[],byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[],byte[]> it = stream.iterator();
List<String> msgTopicList = new ArrayList<String>();
int count = 0;
System.out.println("Waiting");
while(it.hasNext()){
MessageAndMetadata<byte[],byte[]> msgAndData = it.next();
String msg = new String(msgAndData.message());
msgTopicList.add(msg);
String key = "NoKey";
System.out.println(msg);
count++;
}
}
}
Was ich tun müssen, ist erhalten alle Nachrichten aus dem Thema sie die gesendeten Benutzer und zählen sie
Was ist der beste Weg, dies zu tun?
Version kafka_2.10-0.8.1.2.2.4.2-2
Haben Sie immer von Anfang des Themas lesen? Wenn ja, sollten Sie props.put ("auto.offset.reset", "smallest") verwenden; und Auto-Commit deaktivieren: props.put ("enable.auto.commit", false); –