2013-02-21 16 views
6

Ich versuche, meine Storm-Topologie, die einen KafkaSpout verwendet, neu zu balancieren. Mein Code ist:Storm Topology Rebalance mit Java-Code

TopologyBuilder builder = new TopologyBuilder(); 
    Properties kafkaProps = new Properties(); 
    kafkaProps.put("zk.connect", "localhost:2181"); 
    kafkaProps.put("zk.connectiontimeout.ms", "1000000"); 
    kafkaProps.put("groupid", "storm"); 

    builder.setSpout("kafkaSpout" , new KafkaSpout(kafkaProps, "test"), 3); 
    builder.setBolt("eventBolt", new EventBolt(), 2).shuffleGrouping("kafkaSpout", "eventStream"); 
    builder.setBolt("tableBolt", new TableBolt(), 2).shuffleGrouping("kafkaSpout", "tableStream"); 

    Map<String, Object> conf = new HashMap<String, Object>(); 
    conf.put(Config.TOPOLOGY_DEBUG, true); 

    LocalCluster cluster = new LocalCluster(); 
    cluster.submitTopology("test", conf, builder.createTopology()); 

    Utils.sleep(1000*5); 

    List<TopologySummary> topologySummaries = cluster.getClusterInfo().get_topologies(); 
    for (TopologySummary summary : topologySummaries) { 
     StormTopology topology = cluster.getTopology(summary.get_id()); 
     RebalanceOptions options = new RebalanceOptions(); 
     options.set_wait_secs(0); 
     options.set_num_workers(4); 

     for (String name : topology.get_bolts().keySet()) { 
      System.err.println(name + " " + topology.get_bolts().get(name).get_common().get_json_conf()); 
      options.put_to_num_executors(name , 5); 
     } 
     for (String name : topology.get_spouts().keySet()) { 
      System.err.println(name + " " + topology.get_spouts().get(name).get_common().get_json_conf()); 
      options.put_to_num_executors(name , 5); 
     } 

     cluster.rebalance(summary.get_name() , options); 
    } 

jedoch während Ausgleich wieder, folgende Fehler Spur wird angezeigt:

10341 [storm_rishabh-1361473654345-95461d10_watcher_executor] INFO kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-95461d10 begin rebalancing consumer storm_rishabh-1361473654345-95461d10 try #1 
10341 [storm_rishabh-1361473654345-3b26ed76_watcher_executor] INFO kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-3b26ed76 begin rebalancing consumer storm_rishabh-1361473654345-3b26ed76 try #1 
10342 [storm_rishabh-1361473654345-95461d10_watcher_executor] ERROR kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-95461d10 error during syncedRebalance 
java.lang.NullPointerException: null 
at kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:181) ~[kafka_2.9.2-0.7.0.jar:na] 
at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:202) ~[kafka_2.9.2-0.7.0.jar:na] 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:447) ~[kafka_2.9.2-0.7.0.jar:na] 
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:78) ~[scala-library-2.9.2.jar:na] 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:444) ~[kafka_2.9.2-0.7.0.jar:na] 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:401) ~[kafka_2.9.2-0.7.0.jar:na] 
10342 [storm_rishabh-1361473654345-3b26ed76_watcher_executor] ERROR kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-3b26ed76 error during syncedRebalance 
java.lang.NullPointerException: null 
at kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:181) ~[kafka_2.9.2-0.7.0.jar:na] 
at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:202) ~[kafka_2.9.2-0.7.0.jar:na] 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:447) ~[kafka_2.9.2-0.7.0.jar:na] 
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:78) ~[scala-library-2.9.2.jar:na] 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:444) ~[kafka_2.9.2-0.7.0.jar:na] 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:401) ~[kafka_2.9.2-0.7.0.jar:na] 
10342 [storm_rishabh-1361473654345-95461d10_watcher_executor] INFO kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-95461d10 stopping watcher executor thread for consumer storm_rishabh-1361473654345-95461d10 
10343 [storm_rishabh-1361473654345-3b26ed76_watcher_executor] INFO kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-3b26ed76 stopping watcher executor thread for consumer storm_rishabh-1361473654345-3b26ed76 

Kann mir bitte jemand sagen, was das Problem sein kann? Muss ich etwas mehr in kafkaSpout definieren, damit es zum Zeitpunkt des Rebalancing ordnungsgemäß heruntergefahren und dann neu gestartet wird?

Antwort

0

Ich hatte das gleiche Problem beim Ausführen in einem LocalCluster (für Entwicklungszwecke). Ich habe meine Testkonfiguration YAML geändert, um die Anzahl der Worker auf 1:

zu reduzieren
topology.workers: 1 

Dies behebt das Problem. Ich habe noch nicht versucht, dies auf einem tatsächlichen verteilten Cluster zu tun, so dass ich nicht weiß, ob dies nur ein Artefakt der Ausführung im LocalCluster Modus ist oder nicht.

(In meinem Code nie LocalCluster.rebalance rufe ich.)

0

Verwenden Sturm Neuverteilung Befehl von Vorgesetzten oder Nimbus Knoten.

Zum Beispiel: storm rebalance mytopology -n 5 -e blau-tülle = 3 -e yellow-bolt = 10.

Bitte beziehen Sie sich auf diese Website. www.michael-noll.com.