2016-05-27 10 views
10

Gerade jetzt meine Trennung von einem WebLogic JMS-Server sieht wie folgt ausvon einem WebLogic Trennen JMS

import java.util.Hashtable; 
import javax.jms.*; 
import javax.naming.*; 
import javax.transaction.*; 
import java.util.Vector; 
import javax.rmi.PortableRemoteObject; 
import clojure.java.api.Clojure; 
import clojure.lang.IFn; 
import org.apache.log4j.Logger; 
import weblogic.jndi.*; 

public class WebLogicListener implements MessageListener, ExceptionListener{ 
    public InitialContext ctx; 
    public TopicConnectionFactory conFactory; 
    public TopicConnection tCon; 
    public TopicSession tSession; 
    public TopicSubscriber tSub; 
    public Boolean development; 
    public Topic topic; 
    /*clojure function objects*/ 
    public IFn publish; 
    public IFn close; 
    public IFn incrementMetric; 
    public IFn logMessage; 
    public IFn resync; 

    public Object channel; 
    public ExceptionListener exception; 
    public String topicName; 
    public String subName; 
    public String username; 
    public String password; 
    public String clientId; 
    public String factoryJNDI; 
    public String topicJNDI; 
    public Vector nms; 
    public Hashtable<Object,Object> env; 
    public boolean running = false; 

    public WebLogicListener (String topicName, String host, String username, String password, String factoryJNDI, 
          String topicJNDI, String clientId, String subName, String ns, String fnName, 
          boolean development, Vector nms){ 
    this.username = username; 
    this.password = password; 
    this.clientId = clientId; 
    this.topicName = topicName; 
    this.subName = subName; 
    this.development = development; 
    this.topicJNDI = topicJNDI; 
    this.factoryJNDI = factoryJNDI; 
    this.nms = nms; 
    /*Clojure interop handlers*/ 
    IFn chan = Clojure.var("clojure.core.async", "chan"); 
    resync = Clojure.var("cenx.baldr.api", "resync!"); 
    publish = Clojure.var(ns, fnName); 
    incrementMetric = Clojure.var(ns, "log-metric"); 
    logMessage = Clojure.var (ns, "log-message"); 
    close = Clojure.var("clojure.core.async","close!"); 
    /*populate envrionment*/ 
    env = new Hashtable<Object,Object>(); 
    env.put(Context.PROVIDER_URL, host); 
    env.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory"); 
    env.put(Context.SECURITY_PRINCIPAL, username); 
    env.put(Context.SECURITY_CREDENTIALS, password); 
    env.put("weblogic.jndi.createIntermediateContexts", "true"); 
    /*open communication channel for clojure daemon*/ 
    channel = chan.invoke(); 
    } 

    private void initListener() throws JMSException, NamingException{ 
    try{ 
     if (!running && !development){ 
     ctx = new InitialContext(env); 
     topic = (Topic) ctx.lookup(topicJNDI); 
     conFactory = (TopicConnectionFactory)PortableRemoteObject.narrow(ctx.lookup(factoryJNDI), TopicConnectionFactory.class); 
     tCon = (TopicConnection) conFactory.createTopicConnection(); 
     tCon.setExceptionListener(this); 
     tCon.setClientID(clientId); 
     tSession = (TopicSession) tCon.createTopicSession(false, 1); 
     tSub = tSession.createDurableSubscriber(topic, subName); 
     tSub.setMessageListener(this); 
     tCon.start(); 
     running = true; 
     }else{ 
     if (running){ 
      logMessage.invoke("error", String.format("Listener is already running")); 
     } 
     if (development){ 
      logMessage.invoke("info", "Running in development mode, no connection established"); 
     } 
     } 
    } catch(Exception e){ 
     logMessage.invoke("error", String.format("Unable to start listener \n %s", e.toString())); 
    } 
    } 

    public void startListener(){ 
    if (!development && env != null){ 
     try { 
     initListener(); 
     }catch(Exception e){ 
     logMessage.invoke("error", String.format("Unable to start Listener \n %s", e.toString())); 
     } 
    } else { 
     if (development){ 
     logMessage.invoke("info", "Running in development mode, no connection established"); 
     } 
     if (env == null){ 
     logMessage.invoke("error", "Environment variable is null"); 
     } 
    } 
    } 

    ///Closes the JMS connection and the channel 
    public void stopListener(){ 
    if (!development){ 
     try{ 
     tSub.close(); 
     tSession.close(); 
     tCon.close(); 
     incrementMetric.invoke("JMS-disconnect-count"); 
     }catch(Exception e){ 
     logMessage.invoke("error", String.format("Error while stopping the listener \n %s", e.toString())); 
     }finally{ 
     running = false; 
     } 
    } else { 
     logMessage.invoke("info", "Listener not started, running in development mode"); 
    } 
    } 

    public Object getChannel(){ 
    return channel; 
    } 

    //re-initializes the channel in case of error 
    public void initializeChannel(){ 
    if (channel == null){ 
     IFn chan = Clojure.var("clojure.core.async", "chan"); 
     channel = chan.invoke(); 
    } else { 
     logMessage.invoke("info", "Channel is already initialized"); 
    } 
    } 
    //accessors for debugging 

    public void closeSubscription(){ 
    try{ 
     tSub.close(); 
    }catch (Exception e){ 
     logMessage.invoke("error", "unable to close topic subscription"); 
     logMessage.invoke("error", e.toString()); 
    } 
    } 

    public void closeSession(){ 
    try{ 
     tSession.unsubscribe(subName); 
     tSession.close(); 
    }catch (Exception e){ 
     logMessage.invoke("error", "unable to close topic session"); 
     logMessage.invoke("error", e.toString()); 
    } 
    } 

    public void closeConnection(){ 
    try{ 
     tCon.close(); 
    }catch (Exception e){ 
     logMessage.invoke("error", "unable to close topic connection"); 
     logMessage.invoke("error", e.toString()); 
    } 
    } 

    public void closeContext(){ 
    try { 
     ctx.close(); 
    }catch (Exception e){ 
     logMessage.invoke("error", "unable to close context"); 
     logMessage.invoke("error", e.toString()); 
    } 
    } 

    public Boolean isRunning(){ 
    return running; 
    } 

    public Context getContext(){ 
    return ctx; 
    } 

    public TopicConnectionFactory getFactory(){ 
    return conFactory; 
    } 

    public TopicConnection getTopicConnection(){ 
    return tCon; 
    } 

    public TopicSession getTopicSession(){ 
    return tSession; 
    } 

    public Boolean getDevelopmentMode(){ 
    return development; 
    } 

    public TopicSubscriber getTopicSubscriber(){ 
    return tSub; 
    } 

    public Topic getTopic(){ 
    return topic; 
    } 

    /*Interface methods*/ 

    public void onMessage(Message message){ 
    publish.invoke(channel, message); 
    } 
    /*attempt a resync after an exception connection*/ 
    private void resync(){ 
    resync.invoke(nms); 
    } 

    private void attemptReconnect() throws Exception{ 
    if (!development){ 
     //clean up any portions of the connection that managed to establish 
     stopListener(); 
     //incase of stopListener exceptioning out set running to false 
     running = false; 
     do{ 
     try{ 
      initListener(); 
      if (running){ 
      resync(); 
      } 
     }catch(Exception e){ 
      logMessage.invoke("error", 
          String.format("Unable to establish connection to JMS server \n %s", e.toString())); 
     }finally{ 
      Thread.sleep(30000); 
     } 
     } while (!running); 
    } else { 
     logMessage.invoke("info", "Running in development mode, no connection established"); 
    } 
    } 

    public void onException(JMSException e){ 
    logMessage.invoke("error", 
         String.format("A JMS Exception has occurred, attempting to re-establish topic connection \n %s", e.toString())); 
    try{ 
     incrementMetric.invoke("JMS-disconnect-count"); 
     attemptReconnect(); 
    }catch(Exception g){ 
     logMessage.invoke("error", 
         String.format("Unable to start Listener \n %s", g.toString())); 
    } 
    } 

    /* Test functions */ 
    public void testException() throws JMSException{ 
    onException(new JMSException("testing exception function")); 
    } 

    public void testChannel (String message){ 
    if (development){ 
     publish.invoke(channel, message); 
    } 
    } 
} 

Wenn ich schaffen die Verbindung I netstat verwenden, um zu überprüfen, ob der Server

netstat verbunden ist - ein | grep 8001 tcp 0 0 ip-Adresse: 59730
ip-Adresse: 8001 GEGRüNDET

Dann rufe ich meine .stopListener neben der .closeContext Methode und gehe zurück, um meine Verbindung mit netstat wieder zu überprüfen und ich das gleiche Ergebnis

netstat -an | grep 8001 tcp 0 0 ip-Adresse: 59730
ip-Adresse: 8001 GEGRüNDET

Warum die Sitzung, Teilnehmer würde zu schließen, und die Verbindung nicht die Verbindung zum JMS-Server zerstören. Die Dokumentation, die ich gefunden habe, hat mir keine Erklärung dafür gegeben, warum ich die Verbindung nicht vollständig zerstören kann.

+0

Was ist der Wert der "Entwicklung" Flagge? –

+0

ist es auf false gesetzt. Wenn es wahr wäre, wäre eine Verbindung nie hergestellt worden. Ich sehe die Log-Nachrichten von meinem endgültig blockiert beim Trennen. – jrahme

+1

Sie können überprüfen, ob die Verbindung von einer anderen Komponente hergestellt wurde, bevor Sie Ihre JMS-Verbindung/Sitzung erstellen. Vergessen Sie nicht, auch den jndi-Kontext zu schließen. –

Antwort

0

Ich bin mir nicht sicher, dass Sie sich dem richtig nähern. Ich sehe, Sie haben einen Ausnahme-Listener auf der Verbindung.

Auf Weblogic wird der Listener für jedes Fehlerereignis mehrmals aufgerufen, so dass Sie versuchen sollten, nicht bei jedem Aufruf zu versuchen. Es würde einmal für jeden registrierten Kunden und einmal für jede überwachte Verbindung aufgerufen. Sie sollten die Verbindung nur trennen, wenn die Ausnahme eine ServerConnectionLost darstellt.

Außerdem müssen Sie im Fehlerhandler nur die Verbindung schließen. Wenn Sie connection.close() ausgeführt hätten, würden die Sitzung und die Listener ebenfalls geschlossen. Es ist nicht nötig, sie in umgekehrter Reihenfolge zu schließen, wie Sie es tun.

Und noch etwas. Sie sollten in Ihrem Produktionscode keinen "Entwicklungs-" oder "Debug" - oder "Test" -Code haben.

Dieser Teil sagt "wenn (! Entwicklung & & env! = Null) {" ... Sie sollten das nicht tun.

Nun zurück zu Ihrer Frage, warum ist die eigentliche Verbindung nicht geschlossen. Ich sehe, Sie tun

try{ 
    tSub.close(); 
    tSession.close(); 
    tCon.close(); 
    incrementMetric.invoke("JMS-disconnect-count"); 
} catch... 

Wenn tSub.close() oder tSession.close() waren auf Fehler, wäre die Verbindung nie geschlossen erhalten. Wickeln Sie jede einzelne in einen unabhängigen Versuch/Fang ein.