2016-06-20 13 views
15

Ich möchte sicherstellen, ob der kafka Server läuft oder nicht, bevor Sie Produktions- und Verbrauchsjobs starten. Es ist in Windows-Umgebung und hier ist mein kafka Servers Code in Eclipse ...Wie überprüft man, ob Kafka Server läuft?

Properties kafka = new Properties(); 
kafka.setProperty("broker.id", "1"); 
kafka.setProperty("port", "9092"); 
kafka.setProperty("log.dirs", "D://workspace//"); 
kafka.setProperty("zookeeper.connect", "localhost:2181");  
Option<String> option = Option.empty(); 
KafkaConfig config = new KafkaConfig(kafka);   
KafkaServer server = new KafkaServer(config, new CurrentTime(), option); 
server.startup(); 

In diesem Fall if (server != null) ist nicht genug, denn es ist immer wahr. Also gibt es eine Möglichkeit zu wissen, dass mein Kafka-Server läuft und bereit für den Produzenten ist. Ich muss das überprüfen, weil einige Startdatenpakete verloren gehen.

Danke.

+0

Was OS sind Sie verwenden zu produzieren oder zu konsumieren? – Maroun

+0

Ich habe meine Frage aktualisiert. – Khan

Antwort

12

All Kafka Makler ein broker.id zugewiesen werden müssen. Beim Start erstellt ein Broker einen Ephemeralknoten in Zookeeper mit dem Pfad /broker/ids/$id. Da der Knoten flüchtig ist, wird er entfernt, sobald der Broker die Verbindung trennt, z. indem man herunterfährt.

Sie können wie so die Liste der ephemeren Broker-Knoten anzuzeigen:

echo dump | nc localhost 2181 | grep brokers

Die ZooKeeper Client-Schnittstelle eine Reihe von Befehlen aussetzt; dump listet alle Sitzungen und ephemeren Knoten für den Cluster auf.

Hinweis nimmt das oben:

  • Du läufst ZooKeeper auf dem Standard-Port (2181) auf localhost, und dass localhost ist der Marktführer für den Cluster
  • Ihre zookeeper.connect Kafka Config nicht geben Sie eine chroot env für Ihren Kafka Cluster dh es ist nur host:port und nicht host:port/path
0

aus Java, können Sie einfach einen Anruf tätigen zu diesem Befehl:

[[email protected] hsperfdata_kafka]$ /usr/bin/kafka status 
Kafka is running with PID=17850. 

Wickeln Sie es in einem Versuch-fangen. Der Ausnahmeblock wird ausgeführt, wenn Probleme mit kafka aufgetreten sind. Verwenden Sie ein "finally", um alles in Ihrem Java-Programm zu bereinigen.

Ich werde versuchen, ein Codebeispiel ist sehr gute Antwort

+0

können Sie bitte Probe hier teilen –

1

Paul zu teilen und es ist tatsächlich, wie Kafka & Zk zusammen Sicht von einem Broker Punkt arbeiten.

Ich würde sagen, dass eine weitere einfache Möglichkeit zu überprüfen, ob ein Kafka-Server ausgeführt wird, ein einfaches KafkaConsumer zu schaffen, um die cluste zeigen und eine Aktion versuchen, zum Beispiel listTopics().Wenn der kafka Server nicht läuft, erhalten Sie eine TimeoutException und dann können Sie einen try-catch Satz verwenden.

def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = { 
    val props = new Properties() 
    props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString) 
    props.put("group.id", kafkaParams.get("group.id").get.toString) 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
    val simpleConsumer = new KafkaConsumer[String, String](props) 
    simpleConsumer.listTopics() 
    } 
1

Die gute Option ist Adminclient zu verwenden, wie unten, bevor die Nachrichten

private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000;   
try (AdminClient client = AdminClient.create(properties)) { 
      client.listTopics(new ListTopicsOptions().timeoutMs(ADMIN_CLIENT_TIMEOUT_MS)).listings().get(); 
     } catch (ExecutionException ex) { 
      LOG.error("Kafka is not available, timed out after {} ms", ADMIN_CLIENT_TIMEOUT_MS); 
      return; 
     }