Ich muss einen Job in der Nacht ausführen, die alle Nachrichten in einer Kafka-Warteschlange erhalten und einen Prozess mit ihnen ausführen wird. Ich kann die Nachrichten abrufen, aber der Kafka-Stream wartet auf weitere Nachrichten und ich kann meinen Prozess nicht fortsetzen. Ich habe den folgenden Code:Alle Kafka-Nachrichten in eine Warteschlange und stoppen Streaming in Java
...
private ConsumerConnector consumerConnector;
private final static String TOPIC = "test";
public MessageStreamConsumer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "localhost:2181");
properties.put("group.id", "test-group");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
}
public List<String> getMessages() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
List<String> messages = new ArrayList<>();
while (it.hasNext())
messages.add(new String(it.next().message()));
return messages;
}
Der Code ist in der Lage, die Nachrichten zu erhalten, aber wenn es die letzte Nachricht verarbeitet es in der Leitung bleibt:
while (it.hasNext())
Die Frage ist, wie kann ich alle bekommen die Nachrichten vom Kafka, stoppe den Stream und fahre mit meinen anderen Aufgaben fort.
Ich hoffe, dass Sie mir helfen können,
Dank
Mögliches Duplikat von [Kafka Consumer hängt bei .hasNext in Java] (http://stackoverflow.com/questions/28449851/kafka-consumer-hanging-at-hasnext-in-java) – zaynetro
Aber ich denke nicht, dass das Beste Um dies zu tun, warten Sie, bis eine Ausnahme ausgelöst wird.Was passiert, wenn mein Prozess länger dauert als das konfigurierte Timeout? –
Solltest du nicht einen direkten Kafka Consumer anstelle eines KafkaStreams benutzen? Ein Strom würde natürlich am Leben bleiben. – NegatioN