Ich habe einen Prozessor, der Nachrichten von Apache Kafka liest und die Daten an einen REST-Endpunkt sendet.Apache HttpClient - Out of Memory Problem
Der Server hat nur 4-Kerne und 4 GB Hauptspeicher, aus dem ein Maximum von 2 GB an den Java-Prozess zugeordnet ist
Nachrichten erzeugt werden, und mit einer Geschwindigkeit von 4 K/Sekunde verbraucht.
Nach ein paar Minuten läuft das Programm aus dem Speicher.
- Was ist der beste Weg http Rest Endpunkte asynchron rufen und nicht für die Antwort
- warten Wie die Httpclient-Verbindung verwalten? Ich hatte den Eindruck, dass ich den Client ein nie starten müssen schließen, so kann ich die Verbindung wiederverwenden
- Sie Probleme mit dem Code unten sehen Sie
public class SomeProcesor implementiert BProcessor {
private ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
private CompletionService<Boolean> pool = new ExecutorCompletionService<Boolean>(exec);
CloseableHttpAsyncClient httpclient = null ;
@Override
public void begin() {
httpclient = HttpAsyncClients.createDefault();
RequestConfig requestConfig = RequestConfig.custom().setConnectionRequestTimeout(5000).setConnectTimeout(5000).setSocketTimeout(5000).build();
HttpAsyncClients.custom().setDefaultRequestConfig(requestConfig).build();
// Start the client
httpclient.start();
}
@Override
public void process(MessageAndMetadata<?, ?> mMData, List<Map> events) {
List<Map<String, Object>> listMap = new ArrayList<>();
// loop and extract the data from events into the above List
//..
//..
// submit to seperate thread to post to HTTP
pool.submit(new HttpThread(listMap);
}
private class HttpThread implements Callable<Boolean> {
List<Map<String, Object>> listMap = null;
public HttpThread(List<Map<String, Object>> listMap) {
this.listMap = listMap;
}
@Override
public Boolean call() throws Exception {
return postToHttp(listMap);
}
}
private Boolean postToHttp(List<Map<String, Object>> listMap) throws UnsupportedEncodingException {
for (Map<String, Object> map : listMap) {
try {
HttpPost postRequest = new HttpPost("https://myserver:8080/services/collector");
postRequest.addHeader(HttpHeaders.ACCEPT, "application/json");
postRequest.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
postRequest.addHeader(HttpHeaders.CONNECTION, "keep-alive");
StringEntity input = new StringEntity(methodToConvertMapToJSON(map));
input.setContentType("application/json");
postRequest.setEntity(input);
httpclient.execute(postRequest, null);
} catch (Exception e) {
return false;
} catch (Throwable th) {
return false;
}
}
return true;
}
}
würde ich vorschlagen, so dass [ 'verbose: gc'] (http://stackoverflow.com/questions/11889831/java-verbosegc-how-to-read- the-output) (und vielleicht Profiling) um zu bestimmen, * warum * du erschöpfst die Erinnerung. Ich könnte auch den 'HttpClient' in einen [Object Pool] (https://en.wikipedia.org/wiki/Object_pool_pattern) verschieben. –
Die Prozessorklasse wird nur einmal instanziiert, deshalb initiierte ich das httpClient-Objekt innerhalb des Beginns. Warum müssen wir es in den Object Pool verschieben? – jagamot
Und Sie haben nur ** einen Prozessor? Für 4.000 Nachrichten pro Sekunde? –