2015-02-26 11 views
10

Ich implementiere eine IBackingMap für meine Trident-Topologie, um Tupel für ElasticSearch zu speichern (ich weiß, dass es mehrere Implementierungen für die Integration von Trident/ElasticSearch gibt, die bereits bei GitHub existieren, aber ich habe mich entschlossen, ein benutzerdefiniertes zu implementieren, das meiner Aufgabe besser entspricht).Wie schließe ich eine Datenbankverbindung, die von einer IBackingMap-Implementierung innerhalb einer Storm Trident-Topologie geöffnet wurde?

So ist meine Implementierung ein Klassiker mit einer Fabrik:

public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> { 

    // omitting here some other cool stuff... 
    private final Client client; 

    public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) { 

     return new StateFactory() { 

      @Override 
      public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { 

       ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName); 
       CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE); 
       MapState ms = OpaqueMap.build(cm); 
       return new SnapshottableMap(ms, new Values(GLOBAL_KEY)); 
      } 
     }; 
    } 

    public ElasticSearchBackingMap(String host, int port, String clusterName) { 

     Settings settings = ImmutableSettings.settingsBuilder() 
       .put("cluster.name", clusterName).build(); 

     // TODO add a possibility to close the client 
     client = new TransportClient(settings) 
       .addTransportAddress(new InetSocketTransportAddress(host, port)); 
    } 

    // the actual implementation is left out 
} 

Sie sehen es Host/Port/Clusternamen als Eingabe params und erstellt ein Elasticsearch Client als Mitglied der Klasse ABER ES nie geschlossen wird DER KUNDE.

Es dann innerhalb einer Topologie in einer ziemlich gewohnter Weise verwendet wird:

tridentTopology.newStream("spout", spout) 
      // ...some processing steps here... 
      .groupBy(aggregationFields) 
      .persistentAggregate(
        ElasticSearchBackingMap.getFactoryFor(
          ElasticSearchConfig.ES_HOST, 
          ElasticSearchConfig.ES_PORT, 
          ElasticSearchConfig.ES_CLUSTER_NAME 
        ), 
        new Fields(FieldNames.OUTCOME), 
        new BatchAggregator(), 
        new Fields(FieldNames.AGGREGATED)); 

Diese Topologie wird in einige in einem Glas verpackt public static void main, gewickelt und schickte zur Ausführung stürmen.

Die Frage ist, sollte ich mir Sorgen machen über das Schließen der ElasticSearch-Verbindung oder es ist Storms eigenes Geschäft? Wenn es nicht von Storm gemacht wird, wie und wann im Lebenszyklus der Topologie sollte ich das tun?

Vielen Dank im Voraus!

+0

TransportClient sollte für jeden Sturmarbeiter ein Singleton sein. [user-list] (http://elasticsearch-users.115913.n3.nabble.com/What-is-your-best-practice-to-access-a-cluster-by-a-Java-client-td4015311. html). Eigentlich denke ich, dass Sie den Java-Client nicht schließen müssen, weil die Topologie niemals aufhören sollte. – fhussonnois

+1

Ein Hack könnte sein: Erstelle ein Singleton für jeden Arbeiter, z.B. Wenn Sie den ersten Status erstellen und diesen Singleton in der Bereinigungsmethode Ihres Aggregators schließen, sehe ich in Ihrem Code "BatchAggregator". Aber ich würde auch gerne eine bessere Lösung sehen ... – dedek

+0

Siehe auch diese Feature-Anfrage: https://issues.apache.org/jira/browse/STORM-49 – dedek

Antwort

3

Okay, beantworte meine eigene Frage.

Zuallererst, noch einmal vielen Dank @dedek für Vorschläge und die Wiederbelebung des Tickets in Storm's Jira.

Schließlich, da es keine offizielle Möglichkeit gibt, das zu tun, habe ich mich entschieden, für cleanup() Methode von Tridents Filter zu gehen. Bisher habe ich überprüft die folgenden (für Storm v 0.9.4.):

Mit LocalCluster

  • Bereinigung() wird auf Clusters Abschaltung aufgerufen
  • Bereinigung() nicht erhalten aufgerufen, wenn die Topologie zu töten, das ist nicht eine Tragödie sein soll, wird sehr wahrscheinlich ein LocalCluster ohnehin nicht für reale Einsätze verwenden

Mit einem realen Cluster

  • es aufgerufen wird, wenn die Topologie sowie, wenn der Arbeiter -f ‚backtype.storm.daemon.worker‘
  • es bekommt pkill -TERM -u Gewitter gestoppt getötet wird, nicht genannt, wenn der Arbeiter mit töten getötet -9 oder wenn er abstürzt oder - leider - wenn der Arbeitnehmer auf eine Ausnahme aufgrund stirbt

insgesamt, die mehr oder weniger anständige Garantie für Bereinigung() zu erhalten genannt gibt , vorausgesetzt, Sie werden mit der Ausnahmebehandlung vorsichtig sein (ich neige dazu, jedem meiner Dreizack 'Donnerfänge' hinzuzufügen Primitive sowieso).

Mein Code:

public class CloseFilter implements Filter { 

    private static final Logger LOG = LoggerFactory.getLogger(CloseFilter.class); 

    private final Closeable[] closeables; 

    public CloseFilter(Closeable... closeables) { 
     this.closeables = closeables; 
    } 

    @Override 
    public boolean isKeep(TridentTuple tuple) { 
     return true; 
    } 

    @Override 
    public void prepare(Map conf, TridentOperationContext context) { 

    } 

    @Override 
    public void cleanup() { 
     for (Closeable c : closeables) { 
      try { 
       c.close(); 
      } catch (Exception e) { 
       LOG.warn("Failed to close an instance of {}", c.getClass(), e); 
      } 
     } 
    } 
} 

wäre jedoch schön, wenn einige Tage Haken zum Schließen von Verbindungen ein Teil der API werden.