Ich arbeite an einem POC, der Nachrichten von Kafka lesen und durch Storm in Echtzeit verarbeiten wird. Ich habe einen lokalen Zoowärter und Kafka gegründet. Ich habe ein Thema (mit dem Namen test
), Produzent und Verbraucher erstellt und sie funktionieren gut über die Eingabeaufforderung. Jetzt wollte ich die Nachrichten aus dem Thema mit Storm lesen. Wenn ich versuche, den unten stehenden Code auszuführen, wird der Storm-Auslauf nicht mit dem Kafka/Zookeeper verbunden. Dies ist aus dem Protokoll ersichtlich, da localhost oder 2181 nirgends erwähnt wird. Und der Prozess nicht mit der AusnahmeApache Storm (lokal) keine Verbindung zu Apache Kafka (lokal)
6939 [Thema-15-eventsEmitter-Executor [2 2]] INFO oaskPartitionManager - Lesen Sie Partitionsinformationen aus:/test/Sturm/partition_0 -> null
public class TestTopology {
public static void main(String[] args) {
BrokerHosts zkHosts = new ZkHosts("localhost:2181");
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "test", "/test", "storm");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("eventsEmitter", kafkaSpout, 1);
builder.setBolt("eventsProcessor", new WordCountBolt(), 1).shuffleGrouping("eventsEmitter");
Config config = new Config();
config.setMaxTaskParallelism(5);
/*
* config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2);
*
* config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
* config.put(Config.STORM_ZOOKEEPER_SERVERS,
* Arrays.asList("localhost"));
*/
try {
ILocalCluster cls = new LocalCluster();
cls.submitTopology("my-topology", config, builder.createTopology());
} catch (Exception e) {
throw new IllegalStateException("Couldn't initialize the topology",
e);
}
}
}
Es verbindet die lokale ZooKeeper die ihre Erstellung und nicht auf die eine, die die Kafka
4632 [Thread-11] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting
4632 [Thread-11] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 wa[email protected]acd1da
4633 [Thread-11-SendThread(127.0.0.1:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
4634 [Thread-11-SendThread(127.0.0.1:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2000, initiating session
4634 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:62287
4634 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:62287
4635 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x154d458c4130011 with negotiated timeout 20000 for client /127.0.0.1:62287
4635 [Thread-11-SendThread(127.0.0.1:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2000, sessionid = 0x154d458c4130011, negotiated timeout = 20000
4635 [Thread-11-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
Bitte lassen Sie mich läuft, wenn Sie weitere Informationen benötigen.
Welche Ausnahme erleben Sie? – theDima