In meinem Beispielprogramm versuche ich eine Datei zu veröffentlichen und versuche, diese sofort zu verbrauchen. Aber mein Consumer-Iterator gibt null zurück. Irgendeine Idee, was ich falsch mache?Kafka-Verbraucher gibt leeren Iterator zurück
-Test
**main(){**
KafkaMessageProducer producer = new KafkaMessageProducer(topic, file);
producer.generateMessgaes();
MessageListener listener = new MessageListener(topic);
listener.start();
}
Message
public void start() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(CoreConstants.THREAD_SIZE));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE);
for (KafkaStream<byte[], byte[]> stream : streams) {
System.out.println("The stream is --"+ stream.iterator().makeNext().topic());
executor.submit(new ListenerThread(stream));
}
try { // without this wait the subsequent shutdown happens immediately before any messages are delivered
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
if (consumerConnector != null) {
consumerConnector.shutdown();
}
if (executor != null) {
executor.shutdown();
}
}
ListenerThread
public class ListenerThread implements Runnable {
private KafkaStream<byte[], byte[]> stream;
public ListenerThread(KafkaStream<byte[], byte[]> msgStream) {
this.stream = msgStream;
System.out.println("----------" + stream.iterator().makeNext().topic());
}
public void run() {
try {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
// MessageAndMetadata<byte[], byte[]> messageAndMetadata =
// it.makeNext();
// String topic = messageAndMetadata.topic();
// byte[] message = messageAndMetadata.message();
System.out.println("111111111111111111111111111");
FileProcessor processor = new FileProcessor();
processor.processFile("LOB_TOPIC", it.next().message());
}
in der darüber Erator geht nicht in die while-Schleife, da der Iterator null ist. Aber ich bin sicher, dass ich eine einzelne Nachricht zu demselben Thema veröffentliche und der Verbraucher hört auf dieses Thema.
Jede Hilfe wäre
Ich habe das gleiche Problem. Ich kann sehen, dass die Nachrichten im Consumer-Terminal ankommen, aber meine App zeigt jetzt einen "leeren Iterator". Es arbeitete und begann plötzlich zu passieren. Ich würde mich freuen, wenn jemand weiß warum. – Mhoque