2013-04-03 4 views
14

Ich bin mit einem Problem konfrontiert Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload beim Einreichen einer Topologie zu einem Produktionscluster mit IDE, während die gleiche Sache, wenn ich in der Befehlszeile mit storm jar Befehl ausführen, es läuft wie Himmel. Ich habe Beispiele davon aus githublink gesehen.So senden Sie eine Topologie in Sturm Produktion Cluster mit IDE

Für Topologie Absenden ich verwende diese Gruppe von Linien

conf.put(Config.NIMBUS_HOST, NIMBUS_NODE); 
    conf.put(Config.NIMBUS_THRIFT_PORT,6627); 
    conf.put(Config.STORM_ZOOKEEPER_PORT,2181); 
    conf.put(Config.STORM_ZOOKEEPER_SERVERS,ZOOKEEPER_ID); 
    conf.setNumWorkers(20); 
    conf.setMaxSpoutPending(5000); 
    StormSubmitter submitter = new StormSubmitter(); 
    submitter.submitTopology("test", conf, builder.createTopology()); 

Bitte mir vorschlagen, ob dies der richtige Ansatz ist, zu laufen?

Antwort

21

Nun, die Lösung gefunden. Als wir "storm jar" starteten, löste dies ein Eigenschaftsflag für storm.jar im übermittelten jar. Also, wenn wir ein Glas programmatisch einreichen wollen, dann einfach das Kennzeichen auf diese Weise

System.setProperty("storm.jar", <path-to-jar>);

Zum Beispiel:

System.setProperty("storm.jar", "/Users/programming/apache-storm-1.0.1/lib/storm-core-1.0.1.jar"); 
StormSubmitter.submitTopology("myTopology", config, builder.createTopology()); 
+1

Wie haben Sie die folgende Fehlermeldung 'java.lang.RuntimeException zu überwinden: Gefunden mehrere defaults.yaml Ressourcen. Vermutlich bündelst du die Storm-Gläser mit deinem Topologie-Krug. – manthosh

+2

"Mehrere defaults.yaml-Ressourcen gefunden. Wahrscheinlich werden Sie die Storm-Jars mit Ihrer Topologie-JAR-Datei bündeln." Fügen Sie die Storm-Jars nicht in Ihre Topologie-Jar ein, und wenn Sie maven verwenden, fügen Sie diese Zeile in Ihrer Sturmabhängigkeit hinzu. – abhi

+0

seine geben 'java.lang.RuntimeException: Topologie mit Namen' mytopology' existiert bereits auf Cluster' –

4

Ich habe nicht Java-Code ausführen, mich für die Vorlage, aber ich überprüft Befehl Sturm - und es ist eine Python-Datei, die Java und http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html Klasse läuft

Das einzige, was ich denke, Sie sollten darum kümmern - ist zu schließen alle benötigten Bibliotheken, wenn sie ausgeführt werden.

+0

dank alex für den Vorschlag +1 – abhi

5

Für eine Topologie zu entfernten Sturm Cluster einreichen, müssen Sie dieses Glas hochladen zu Nimbus Maschine und dann dieses Glas zu Cluster mit NimbusClient.
Sie können es wie folgt tun:

Map storm_conf = Utils.readStormConfig(); 
storm_conf.put("nimbus.host", "<Nimbus Machine IP>"); 
Client client = NimbusClient.getConfiguredClient(storm_conf) 
           .getClient(); 
String inputJar = "C:\\workspace\\TestStormRunner\\target\\TestStormRunner-0.0.1-SNAPSHOT-jar-with-dependencies.jar"; 
NimbusClient nimbus = new NimbusClient(storm_conf, "<Nimbus Machine IP>", 
           <Nimbus Machine Port>); 
// upload topology jar to Cluster using StormSubmitter 
String uploadedJarLocation = StormSubmitter.submitJar(storm_conf, 
           inputJar); 

String jsonConf = JSONValue.toJSONString(storm_conf); 
nimbus.getClient().submitTopology("testtopology", 
         <uploadedJarLocation>, jsonConf, builder.createTopology()); 

Hier ist die Arbeitsbeispiel: Submitting a topology to Remote Storm Cluster

3

ich das dieses Problem auf @abhi und @Nishu Tayal die Antworten basierend gelöst haben, würde Ich mag meine posten Code hier:

public static void submitLocalTopologyWay1(String topologyName, Config topologyConf, 
     StormTopology topology, String localJar) { 
    try { 
     //get default storm config 
     Map defaultStormConf = Utils.readStormConfig(); 
     defaultStormConf.putAll(topologyConf); 

     //set JAR 
     System.setProperty("storm.jar",localJar); 

     //submit topology 
     StormSubmitter.submitTopology(topologyName, defaultStormConf, topology); 

    } catch (Exception e) { 
     String errorMsg = "can't deploy topology " + topologyName + ", " + e.getMessage(); 
     System.out.println(errorMsg); 
     e.printStackTrace(); 
    } 
} 

public static void submitLocalTopologyWay2(String topologyName, Config topologyConf, 
     StormTopology topology, String localJar) { 
    try { 
     //get nimbus client 
     Map defaultStormConf = Utils.readStormConfig(); 
     defaultStormConf.putAll(topologyConf); 
     Client client = NimbusClient.getConfiguredClient(defaultStormConf).getClient(); 

     //upload JAR 
     String remoteJar = StormSubmitter.submitJar(defaultStormConf, localJar); 

     //submit topology 
     client.submitTopology(topologyName, remoteJar, JSONValue.toJSONString(topologyConf), topology); 

    } catch (Exception e) { 
     String errorMsg = "can't deploy topology " + topologyName + ", " + e.getMessage(); 
     System.out.println(errorMsg); 
     e.printStackTrace(); 
    } 
} 

dann hier ist ein Test, und Sie müssen Ihren Code zu einer JAR-Datei zuerst erstellen.

public void testSubmitTopologySubmitLocalTopologyWay1() { 
    Config config = new Config(); 
    config.put(Config.NIMBUS_HOST,"9.119.84.179"); 
    config.put(Config.NIMBUS_THRIFT_PORT, 6627); 
    config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("9.119.84.177","9.119.84.178","9.119.84.176")); 
    config.put(Config.STORM_ZOOKEEPER_PORT,2181); 

    config.put(Config.TOPOLOGY_WORKERS, 3); 

    RemoteSubmitter.submitLocalTopologyWay1("word-count-test-1", config, 
      WordCountTopology.buildTopology(), // your topology 
      "C:\\MyWorkspace\\project\\storm-sample-0.0.1-SNAPSHOT-jar-with-dependencies.jar");//the JAR file 
}