2016-03-28 14 views
1

In meinem Beispielprogramm versuche ich eine Datei zu veröffentlichen und versuche, diese sofort zu verbrauchen. Aber mein Consumer-Iterator gibt null zurück. Irgendeine Idee, was ich falsch mache?Kafka-Verbraucher gibt leeren Iterator zurück

-Test

**main(){** 

KafkaMessageProducer producer = new KafkaMessageProducer(topic, file); 
     producer.generateMessgaes(); 

     MessageListener listener = new MessageListener(topic); 
     listener.start(); 
} 

Message

public void start() { 



     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
     topicCountMap.put(topic, new Integer(CoreConstants.THREAD_SIZE)); 

     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector 
       .createMessageStreams(topicCountMap); 

     List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 
     executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE); 

     for (KafkaStream<byte[], byte[]> stream : streams) { 
      System.out.println("The stream is --"+ stream.iterator().makeNext().topic()); 
      executor.submit(new ListenerThread(stream));  
     } 
     try { // without this wait the subsequent shutdown happens immediately before any messages are delivered 
      Thread.sleep(10000); 
     } catch (InterruptedException ie) { 

     } 
     if (consumerConnector != null) { 
      consumerConnector.shutdown(); 
     } 
     if (executor != null) { 
      executor.shutdown(); 
     } 
    } 

ListenerThread

public class ListenerThread implements Runnable { 
     private KafkaStream<byte[], byte[]> stream; 

     public ListenerThread(KafkaStream<byte[], byte[]> msgStream) { 
      this.stream = msgStream; 
      System.out.println("----------" + stream.iterator().makeNext().topic()); 
     } 

public void run() { 
     try { 

      ConsumerIterator<byte[], byte[]> it = stream.iterator(); 
      while (it.hasNext()) { 
       // MessageAndMetadata<byte[], byte[]> messageAndMetadata = 
       // it.makeNext(); 
       // String topic = messageAndMetadata.topic(); 
       // byte[] message = messageAndMetadata.message(); 
       System.out.println("111111111111111111111111111"); 
       FileProcessor processor = new FileProcessor(); 
       processor.processFile("LOB_TOPIC", it.next().message()); 
      } 

in der darüber Erator geht nicht in die while-Schleife, da der Iterator null ist. Aber ich bin sicher, dass ich eine einzelne Nachricht zu demselben Thema veröffentliche und der Verbraucher hört auf dieses Thema.

Jede Hilfe wäre

+0

Ich habe das gleiche Problem. Ich kann sehen, dass die Nachrichten im Consumer-Terminal ankommen, aber meine App zeigt jetzt einen "leeren Iterator". Es arbeitete und begann plötzlich zu passieren. Ich würde mich freuen, wenn jemand weiß warum. – Mhoque

Antwort

1

ich gestern das gleiche Problem hatte geschätzt. Nachdem ich eine Weile damit gearbeitet habe, konnte ich es nicht von meinem aktuellen Thema lesen. Also habe ich folgende Schritte gemacht:

a. Gestoppt mein Verbraucher,

b. stoppte den Hersteller,

c. stoppte den kafka-server bin/zoekeeper-server-stop.sh config/zoekeeper.properties

d. stoppte den Tierpfleger bin/zoekeeper-server-stop.sh config/zoekeeper.properties

Danach habe ich mein Thema gelöscht. bin/kafka-topics.sh --delete --zookeeper localhost: 2181 --Topic-Test

Ich löschte auch die Dateien, die erstellt wurden, indem ich dem "Einrichten eines Multi-Broker-Clusters" folgte, aber ich nicht denke, dass es das Problem verursacht hat.

a. Begann den Zookeeper b. begann kafka c. Produzent gestartet und senden Sie einige Nachrichten an Kafka

es begann wieder zu arbeiten. Ich bin mir nicht sicher, ob dir das hilft oder nicht. Aber irgendwie scheint mein Produzent vom Verbraucher getrennt worden zu sein. Hoffe das hilft.

+0

Ich weiß nicht, wie ich dieses Problem überwunden habe ... Irgendwann ging es weg. Aber ich habe jetzt ein ähnliches Problem, das ich in der Kafka-Benutzer-Mailing-Liste veröffentlicht habe. Thema "Wenn mein Produzent produziert, dann warum der Verbraucher nicht konsumieren konnte? Es steckte @ Umfrage()" – Ratha