2016-07-29 72 views
1

Ich muss einen Job in der Nacht ausführen, die alle Nachrichten in einer Kafka-Warteschlange erhalten und einen Prozess mit ihnen ausführen wird. Ich kann die Nachrichten abrufen, aber der Kafka-Stream wartet auf weitere Nachrichten und ich kann meinen Prozess nicht fortsetzen. Ich habe den folgenden Code:Alle Kafka-Nachrichten in eine Warteschlange und stoppen Streaming in Java

... 
private ConsumerConnector consumerConnector; 
private final static String TOPIC = "test"; 

public MessageStreamConsumer() { 
     Properties properties = new Properties(); 
     properties.put("zookeeper.connect", "localhost:2181"); 
     properties.put("group.id", "test-group"); 
     ConsumerConfig consumerConfig = new ConsumerConfig(properties); 
     consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); 
    } 
public List<String> getMessages() { 
       Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
       topicCountMap.put(TOPIC, new Integer(1)); 
       Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector 
         .createMessageStreams(topicCountMap); 
       KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0); 
       ConsumerIterator<byte[], byte[]> it = stream.iterator(); 
       List<String> messages = new ArrayList<>(); 
       while (it.hasNext()) 
        messages.add(new String(it.next().message())); 
       return messages; 
      } 

Der Code ist in der Lage, die Nachrichten zu erhalten, aber wenn es die letzte Nachricht verarbeitet es in der Leitung bleibt:

while (it.hasNext()) 

Die Frage ist, wie kann ich alle bekommen die Nachrichten vom Kafka, stoppe den Stream und fahre mit meinen anderen Aufgaben fort.

Ich hoffe, dass Sie mir helfen können,

Dank

+0

Mögliches Duplikat von [Kafka Consumer hängt bei .hasNext in Java] (http://stackoverflow.com/questions/28449851/kafka-consumer-hanging-at-hasnext-in-java) – zaynetro

+0

Aber ich denke nicht, dass das Beste Um dies zu tun, warten Sie, bis eine Ausnahme ausgelöst wird.Was passiert, wenn mein Prozess länger dauert als das konfigurierte Timeout? –

+0

Solltest du nicht einen direkten Kafka Consumer anstelle eines KafkaStreams benutzen? Ein Strom würde natürlich am Leben bleiben. – NegatioN

Antwort

0

Es scheint, dass kafka Strom nicht von Anfang an konsumieren nicht unterstützt.
Sie könnten einen nativen Kafka-Consumer erstellen und auto.offset.reset auf früheste setzen, dann wird es Nachricht von Anfang an konsumieren.

0

So etwas mag funktionieren. Grundsätzlich besteht die Idee darin, einen Kafka Consumer und eine Umfrage zu verwenden, bis Sie eine Aufzeichnung erhalten und dann aufhören, wenn Sie eine leere Charge erhalten.

package kafka.examples; 

import java.text.DateFormat; 
import java.text.SimpleDateFormat; 
import java.util.Calendar; 
import java.util.Collections; 
import java.util.Date; 
import java.util.Properties; 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.atomic.AtomicBoolean; 

import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 


public class Consumer1 extends Thread 
{ 
    private final KafkaConsumer<Integer, String> consumer; 
    private final String topic; 
    private final DateFormat df; 
    private final String logTag; 
    private boolean noMoreData = false; 
    private boolean gotData = false; 
    private int messagesReceived = 0; 
    AtomicBoolean isRunning = new AtomicBoolean(true); 
    CountDownLatch shutdownLatch = new CountDownLatch(1); 

    public Consumer1(Properties props) 
    { 
     logTag = "Consumer1"; 

     consumer = new KafkaConsumer<>(props); 
     this.topic = props.getProperty("topic"); 
     this.df = new SimpleDateFormat("HH:mm:ss"); 

     consumer.subscribe(Collections.singletonList(this.topic)); 
    } 

    public void getMessages() { 
     System.out.println("Getting messages..."); 
     while (noMoreData == false) { 
      //System.out.println(logTag + ": Doing work..."); 

      ConsumerRecords<Integer, String> records = consumer.poll(1000); 
      Date now = Calendar.getInstance().getTime(); 
      int recordsCount = records.count(); 
      messagesReceived += recordsCount; 
      System.out.println("recordsCount: " + recordsCount); 
      if (recordsCount > 0) { 
       gotData = true; 
      } 

      if (gotData && recordsCount == 0) { 
       noMoreData = true; 
      } 

      for (ConsumerRecord<Integer, String> record : records) { 
       int kafkaKey = record.key(); 
       String kafkaValue = record.value(); 
       System.out.println(this.df.format(now) + " " + logTag + ":" + 
         " Received: {" + kafkaKey + ":" + kafkaValue + "}" + 
         ", partition(" + record.partition() + ")" + 
         ", offset(" + record.offset() + ")"); 
      } 
     } 
     System.out.println("Received " + messagesReceived + " messages"); 
    } 

    public void processMessages() { 
     System.out.println("Processing messages..."); 
    } 

    public void run() { 
     getMessages(); 
     processMessages(); 
    } 
} 
0

Ich entwickle derzeit mit Kafka 0.10.0.1 und gemischten Informationen über die Verwendung von Verbrauchern Eigentum auto.offset.reset gefunden so habe ich einige Experimente durchgeführt, um herauszufinden, was tatsächlich passiert.

auf solche auf Basis verstehe ich es jetzt so: wenn Sie Eigenschaft:

auto.offset.reset=earliest 

dies den Verbraucher positioniert ENTWEDER zugewiesen die erste verfügbare Nachricht in den Trennwänden (wenn keine Commits auf die gemacht wurden parations) ODER es positioniert den Verbraucher bei den letzten festgeschriebenen Partitionsversätzen (beachten Sie, dass Sie immer den letzten Leseoffset + 1 festschreiben, sonst lesen Sie die letzte festgeschriebene Nachricht bei jedem Neustart des Verbrauchers erneut)

Oder Sie nicht setzen auto.offset.reset welche mea Es wird der Standardwert von 'neustes' verwendet.

In diesem Fall erhalten Sie keine alten Nachrichten beim Verbinden der Consumer-Only-Nachrichten, die nach dem Verbinden des Konsumenten zum Thema veröffentlicht werden, werden empfangen.

Als eine Schlussfolgerung - wenn Sie sicherstellen möchten, alle verfügbaren Nachrichten für ein bestimmtes Thema und zugewiesene Partitionen zu erhalten, müssen Sie seekToBeginning() aufrufen.

Es scheint geraten Umfrage zu nennen (0L) zuerst die Verbraucher zugewiesen bekommt Partitionen zu gewährleisten, wird jede der zugeordneten Partitionen ‚Anfang‘ suchen (oder Code in der ConsumerRebalanceListener zu implementieren!):

kafkaConsumer.poll(0L); 
kafkaConsumer.seekToBeginning(kafkaConsumer.assignment());