2016-06-04 7 views
2

Ich habe einen Prozessor, der Nachrichten von Apache Kafka liest und die Daten an einen REST-Endpunkt sendet.Apache HttpClient - Out of Memory Problem

Der Server hat nur 4-Kerne und 4 GB Hauptspeicher, aus dem ein Maximum von 2 GB an den Java-Prozess zugeordnet ist

Nachrichten erzeugt werden, und mit einer Geschwindigkeit von 4 K/Sekunde verbraucht.

Nach ein paar Minuten läuft das Programm aus dem Speicher.

  • Was ist der beste Weg http Rest Endpunkte asynchron rufen und nicht für die Antwort
  • warten Wie die Httpclient-Verbindung verwalten? Ich hatte den Eindruck, dass ich den Client ein nie starten müssen schließen, so kann ich die Verbindung wiederverwenden
  • Sie Probleme mit dem Code unten sehen Sie

public class SomeProcesor implementiert BProcessor {

private ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors.newFixedThreadPool(4); 
private CompletionService<Boolean> pool = new ExecutorCompletionService<Boolean>(exec); 
CloseableHttpAsyncClient httpclient = null ; 

@Override 
public void begin() { 
    httpclient = HttpAsyncClients.createDefault(); 
    RequestConfig requestConfig = RequestConfig.custom().setConnectionRequestTimeout(5000).setConnectTimeout(5000).setSocketTimeout(5000).build(); 
    HttpAsyncClients.custom().setDefaultRequestConfig(requestConfig).build(); 
    // Start the client 
    httpclient.start(); 

} 

@Override 
public void process(MessageAndMetadata<?, ?> mMData, List<Map> events) { 

    List<Map<String, Object>> listMap = new ArrayList<>(); 

    // loop and extract the data from events into the above List 
    //.. 
    //.. 

    // submit to seperate thread to post to HTTP 
    pool.submit(new HttpThread(listMap); 
} 

private class HttpThread implements Callable<Boolean> { 
    List<Map<String, Object>> listMap = null; 
    public HttpThread(List<Map<String, Object>> listMap) { 
     this.listMap = listMap; 
    } 
    @Override 
    public Boolean call() throws Exception { 
     return postToHttp(listMap); 
    } 
} 

private Boolean postToHttp(List<Map<String, Object>> listMap) throws UnsupportedEncodingException { 
    for (Map<String, Object> map : listMap) { 

     try { 
      HttpPost postRequest = new HttpPost("https://myserver:8080/services/collector"); 
      postRequest.addHeader(HttpHeaders.ACCEPT, "application/json"); 
      postRequest.addHeader(HttpHeaders.CONTENT_TYPE, "application/json"); 
      postRequest.addHeader(HttpHeaders.CONNECTION, "keep-alive"); 

      StringEntity input = new StringEntity(methodToConvertMapToJSON(map)); 
      input.setContentType("application/json"); 
      postRequest.setEntity(input); 

      httpclient.execute(postRequest, null); 

     } catch (Exception e) { 
      return false; 
     } catch (Throwable th) { 
      return false; 
     } 
    } 
    return true; 
} 

}

+0

würde ich vorschlagen, so dass [ 'verbose: gc'] (http://stackoverflow.com/questions/11889831/java-verbosegc-how-to-read- the-output) (und vielleicht Profiling) um zu bestimmen, * warum * du erschöpfst die Erinnerung. Ich könnte auch den 'HttpClient' in einen [Object Pool] (https://en.wikipedia.org/wiki/Object_pool_pattern) verschieben. –

+0

Die Prozessorklasse wird nur einmal instanziiert, deshalb initiierte ich das httpClient-Objekt innerhalb des Beginns. Warum müssen wir es in den Object Pool verschieben? – jagamot

+0

Und Sie haben nur ** einen Prozessor? Für 4.000 Nachrichten pro Sekunde? –

Antwort

2

müssen die HTTP-Antwort oder Release-Verbindung verbrauchen, andernfalls wird die Verbindung Ressourcen verbrauchen. Änderung

httpclient.execute(postRequest, null); 

zu

HttpResponse response = httpclient.execute(postRequest, null).get(); 
if(response.getStatusLine().getStatusCode() != 200) { 
// do something 
} 
// release the connection, better add to a finally clause 
postRequest.releaseConnection(); 
+0

meinst du releaseConnection() noch bevor die Antwort erhalten wird? Wird die ursprüngliche Anfrage nicht selbst abgebrochen? – jagamot

+0

Wenn ich auf die Antwort warte, wie wird es als asynchron betrachtet? – jagamot

+0

Die Ausführungsmethoden geben HttpResponse nicht zurück! – jagamot