2016-06-01 12 views
1

Ich möchte ein LocalCluster() instanziieren und verhindern, dass es seinen eigenen eingebetteten zoekeeper ausführt, und stattdessen meinen verwenden.So geben Sie zookeeper-Informationen bei der Instanziierung von Apache Storm ein LocalCluster

In Bezug auf dieses Problem: "https://issues.apache.org/jira/browse/STORM-213" ist es für Version 0.9.3 gelöst.

kann ich einen Beispielcode dafür haben?

PS: Ich integriere meine Sturm Topologie testen, und ich benutze Kafka und Tierpfleger als Eingabe für Sturm. wenn ich localcluster nicht die Tierpfleger info angeben, erhalte ich diese Ausnahme in Zeile „LocalCluster localCluster = new LocalCluster()“:

2016-06-08 12:16:56,785 WARN [Thread-30] jmx.MBeanRegistry (MBeanRegistry.java:register(100)) - Failed to register MBean StandaloneServer_port-1 
2016-06-08 12:16:56,785 WARN [Thread-30] server.ZooKeeperServer (ZooKeeperServer.java:registerJMX(387)) - Failed to register with JMX 
javax.management.InstanceAlreadyExistsException: org.apache.ZooKeeperService:name0=StandaloneServer_port-1 
    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) 
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) 
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) 
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) 
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) 
    at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) 
    at org.apache.zookeeper.jmx.MBeanRegistry.register(MBeanRegistry.java:96) 
    at org.apache.zookeeper.server.ZooKeeperServer.registerJMX(ZooKeeperServer.java:377) 
    at org.apache.zookeeper.server.ZooKeeperServer.startup(ZooKeeperServer.java:410) 
    at org.apache.zookeeper.server.NIOServerCnxnFactory.startup(NIOServerCnxnFactory.java:123) 

und wenn ich „storm.zookeeper.servers“ und „Sturm angeben .zookeeper.port“zu lokalen Cluster, erhalte ich unter Ausnahme bei "localCluster.submitTopology()" Linie:

EndOfStreamException: Unable to read additional data from client sessionid 0x1552f0890b70000, likely client has closed socket 
    at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) 
    at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) 
    at java.lang.Thread.run(Thread.java:745) 

java.lang.NullPointerException 
    at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:26) 
    at org.apache.storm.testing$submit_local_topology.invoke(testing.clj:301) 
    at org.apache.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:49) 
    at org.apache.storm.LocalCluster.submitTopology(Unknown Source) 
+0

Ich denke, es ist ausreichend, die Eigenschaft "storm.zookeeper.server" in Ihrer Config anzugeben, die Sie an LocalCluster übergeben, damit es funktioniert. –

+0

Tanx für Ihre Antwort, ich habe es getan, und auch "storm.zookeeper.port", aber wenn ich die Topologie einreiche, bekomme ich eine NullPointerException! ohne mehr Details! – Mohammad

+0

Können Sie den Stack-Trace teilen? –

Antwort

0

Sie den LocalCluster Konstruktor Überlastung verwenden können.

LocalCluster cluster = new LocalCluster("localhost", 2181L); 
0

konnte ich Kafka als Eingang eines Sturm Topologie an dieser Verbindung http://storm.apache.org/releases/1.0.2/storm-kafka.html

I eine bestimmte Java-Klasse für den Sturm Config

erstellt
public class StormConfig { 

private String zooKeeperConnect; 

public StormConfig() { 

} 
public KafkaSpout getkafkaSpout(String topic){ 
    return new KafkaSpout(this.getSpoutConfig(topic)); 
} 

public SpoutConfig getSpoutConfig(String topic) { 
    SpoutConfig spoutConfig=new SpoutConfig(this.getZkHosts(), topic, "", topic); 
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
    spoutConfig.startOffsetTime=kafka.api.OffsetRequest.EarliestTime(); 
    return spoutConfig; 
} 

public ZkHosts getZkHosts() { 
    return new ZkHosts(getZooKeeperConnect()); 
} 

public String getZooKeeperConnect() { 
    return zooKeeperConnect; 
} 

public void setZooKeeperConnect(String zooKeeperConnect) { 
    this.zooKeeperConnect = zooKeeperConnect; 
} 
} 
bereitgestellt durch Verwenden der Konfiguration verwenden

Dann habe ich die Methode aus dieser Klasse verwendet, wenn ich den ersten Auslauf der Topologie erstellt habe:

+0

Wirklich danke! So hilfreich. –