2016-05-21 9 views
0

Ich arbeite an einem POC, der Nachrichten von Kafka lesen und durch Storm in Echtzeit verarbeiten wird. Ich habe einen lokalen Zoowärter und Kafka gegründet. Ich habe ein Thema (mit dem Namen test), Produzent und Verbraucher erstellt und sie funktionieren gut über die Eingabeaufforderung. Jetzt wollte ich die Nachrichten aus dem Thema mit Storm lesen. Wenn ich versuche, den unten stehenden Code auszuführen, wird der Storm-Auslauf nicht mit dem Kafka/Zookeeper verbunden. Dies ist aus dem Protokoll ersichtlich, da localhost oder 2181 nirgends erwähnt wird. Und der Prozess nicht mit der AusnahmeApache Storm (lokal) keine Verbindung zu Apache Kafka (lokal)

6939 [Thema-15-eventsEmitter-Executor [2 2]] INFO oaskPartitionManager - Lesen Sie Partitionsinformationen aus:/test/Sturm/partition_0 -> null

public class TestTopology { 

    public static void main(String[] args) { 

     BrokerHosts zkHosts = new ZkHosts("localhost:2181"); 
     SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "test", "/test", "storm"); 
     kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
     KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig); 
     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout("eventsEmitter", kafkaSpout, 1); 
     builder.setBolt("eventsProcessor", new WordCountBolt(), 1).shuffleGrouping("eventsEmitter"); 
     Config config = new Config(); 
     config.setMaxTaskParallelism(5); 
     /* 
     * config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2); 
     * 
     * config.put(Config.STORM_ZOOKEEPER_PORT, 2181); 
     * config.put(Config.STORM_ZOOKEEPER_SERVERS, 
     * Arrays.asList("localhost")); 
     */ 

     try { 
      ILocalCluster cls = new LocalCluster();   
      cls.submitTopology("my-topology", config, builder.createTopology()); 
     } catch (Exception e) { 
      throw new IllegalStateException("Couldn't initialize the topology", 
        e); 
     } 
    } 

} 

Es verbindet die lokale ZooKeeper die ihre Erstellung und nicht auf die eine, die die Kafka

4632 [Thread-11] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 
4632 [Thread-11] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 wa[email protected]acd1da 
4633 [Thread-11-SendThread(127.0.0.1:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error) 
4634 [Thread-11-SendThread(127.0.0.1:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2000, initiating session 
4634 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:62287 
4634 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:62287 
4635 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x154d458c4130011 with negotiated timeout 20000 for client /127.0.0.1:62287 
4635 [Thread-11-SendThread(127.0.0.1:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2000, sessionid = 0x154d458c4130011, negotiated timeout = 20000 
4635 [Thread-11-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 

Bitte lassen Sie mich läuft, wenn Sie weitere Informationen benötigen.

+0

Welche Ausnahme erleben Sie? – theDima

Antwort

0

Sie müssen den config so konfigurieren, dass er den Kafka-Server-Port wissen werden, zum Beispiel wie folgt:

Properties props = new Properties(); 
//default broker port = 9092 
props.put("metadata.broker.list", "localhost:" + BROKER_PORT); 
props.put("request.required.acks", "1"); 
props.put("serializer.class", "kafka.serializer.StringEncoder"); 

Config config = new Config();   
config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props); 
config.setDebug(true); 
config.setMaxTaskParallelism(5); 
+0

@ Matthias, @ Dima beantwortete meine eigene Frage. Anscheinend war es meine Albernheit. Danke für deine Kommentare. Bin dankbar. – mavrav

0

Nach einer nervenaufreibenden Nacht habe ich dafür auf die Lösung getroffen. Eigentlich lag das Problem nicht beim Code, sondern bei den Jars. Ich hatte log4j Gläser aus allen 3 Paketen nämlich Tierpfleger, Kafka und Sturm angeschlossen. Aber der Code war nur eine erwartet. Dies war eine rote Warnung in meiner Sonnenfinsternis, die ich zuvor ignoriert hatte. Als ich die überflüssigen log4js entfernte, fing der Kafka-Ausguß an, vom Kafka-Thema zu lesen, das ich erstellt hatte. Ich danke Ihnen allen, dass Sie sich die Zeit genommen haben, sich mit diesem Thema zu befassen. @Matthias Ich nehme an, seit ich es mit dem Zookeeper verlinkt habe, verbindet es sich mit dem, was kafka von diesem Zookeeper verwaltet wird. Also erwähnen, dass möglicherweise mindestens auf lokaler Ebene nicht notwendig. Aber trotzdem danke ..