Ich implementiere eine IBackingMap für meine Trident-Topologie, um Tupel für ElasticSearch zu speichern (ich weiß, dass es mehrere Implementierungen für die Integration von Trident/ElasticSearch gibt, die bereits bei GitHub existieren, aber ich habe mich entschlossen, ein benutzerdefiniertes zu implementieren, das meiner Aufgabe besser entspricht).Wie schließe ich eine Datenbankverbindung, die von einer IBackingMap-Implementierung innerhalb einer Storm Trident-Topologie geöffnet wurde?
So ist meine Implementierung ein Klassiker mit einer Fabrik:
public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> {
// omitting here some other cool stuff...
private final Client client;
public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) {
return new StateFactory() {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName);
CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE);
MapState ms = OpaqueMap.build(cm);
return new SnapshottableMap(ms, new Values(GLOBAL_KEY));
}
};
}
public ElasticSearchBackingMap(String host, int port, String clusterName) {
Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", clusterName).build();
// TODO add a possibility to close the client
client = new TransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(host, port));
}
// the actual implementation is left out
}
Sie sehen es Host/Port/Clusternamen als Eingabe params und erstellt ein Elasticsearch Client als Mitglied der Klasse ABER ES nie geschlossen wird DER KUNDE.
Es dann innerhalb einer Topologie in einer ziemlich gewohnter Weise verwendet wird:
tridentTopology.newStream("spout", spout)
// ...some processing steps here...
.groupBy(aggregationFields)
.persistentAggregate(
ElasticSearchBackingMap.getFactoryFor(
ElasticSearchConfig.ES_HOST,
ElasticSearchConfig.ES_PORT,
ElasticSearchConfig.ES_CLUSTER_NAME
),
new Fields(FieldNames.OUTCOME),
new BatchAggregator(),
new Fields(FieldNames.AGGREGATED));
Diese Topologie wird in einige in einem Glas verpackt public static void main, gewickelt und schickte zur Ausführung stürmen.
Die Frage ist, sollte ich mir Sorgen machen über das Schließen der ElasticSearch-Verbindung oder es ist Storms eigenes Geschäft? Wenn es nicht von Storm gemacht wird, wie und wann im Lebenszyklus der Topologie sollte ich das tun?
Vielen Dank im Voraus!
TransportClient sollte für jeden Sturmarbeiter ein Singleton sein. [user-list] (http://elasticsearch-users.115913.n3.nabble.com/What-is-your-best-practice-to-access-a-cluster-by-a-Java-client-td4015311. html). Eigentlich denke ich, dass Sie den Java-Client nicht schließen müssen, weil die Topologie niemals aufhören sollte. – fhussonnois
Ein Hack könnte sein: Erstelle ein Singleton für jeden Arbeiter, z.B. Wenn Sie den ersten Status erstellen und diesen Singleton in der Bereinigungsmethode Ihres Aggregators schließen, sehe ich in Ihrem Code "BatchAggregator". Aber ich würde auch gerne eine bessere Lösung sehen ... – dedek
Siehe auch diese Feature-Anfrage: https://issues.apache.org/jira/browse/STORM-49 – dedek