2016-06-08 23 views
0

Ich habe eine große CSV-Datei mit etwa 5,2 Millionen Zeilen. Ich möchte diese Datei analysieren und die Daten in eine Datenbank einfügen. Ich benutze Apache Kamel dafür.GC-Probleme beim Einfügen einer großen CSV-Datei mit Apache Camel

Der Weg ist ziemlich einfach (für dieses Beispiel vereinfacht)

from("file:work/customer/").id("customerDataRoute") 
.split(body().tokenize("\n")).streaming() 
.parallelProcessing() 
.unmarshal(bindyCustomer) 
.split(body()) 
.process(new CustomerProcessor()) 
.to("sql:INSERT INTO CUSTOMER_DATA(`FIELD1`,`FIELD2`) VALUES(#,#)"); 

bindyCustomer ist ein BindyCsvDataFormat für die CSV-Datei und CustomerProcessor ist ein Prozessor, der die Daten des Bindy Customer-Objekt als eine Reihe von Objekten zurückgibt für die SQL-Einfügung. Das eigentliche Objekt hat 39 Felder (oben vereinfacht).

Dies funktioniert alles in Ordnung für die ersten 800.000 bis 1.000.000 Linien, aber dann kommt es zum Stillstand.

Ich habe die Camel-Instanz mit JVisualVM und dem Visual GC-Plugin überwacht und sehe, dass die alte Generation voll wird und wenn sie das Maximum erreicht, kommt das ganze System zum Stillstand, aber es stürzt nicht ab. An diesem Punkt ist die alte Generation voll, der Eden-Raum ist fast voll und beide Survivor-Räume sind leer (da sie nichts an die alte Generation rühren können, denke ich).

Was ist hier falsch? Das sieht für mich wie ein Speicherleck in der Camel SQL Komponente aus. Die Daten werden hauptsächlich in ConcurrentHashMap-Objekten gespeichert.

Wenn ich die SQL-Komponente herausnehme, füllt sich die alte Generation kaum.

Ich benutze Camel 2.15.1 Wird versuchen, 2.17.1 zu verwenden, um zu sehen, ob das das Problem behebt.

Update: Ich habe Camel 2.17.1 (das gleiche Problem) versucht und ich habe versucht, die tun die Einsätze in Java mit java.sql.Statement.executeUPdate einfügen. Mit dieser Option konnte ich ungefähr 2.6 M Zeilen einfügen, aber dann hörte es auch auf. Das Lustige ist, dass ich keinen Speicherfehler bekomme. Es kommt einfach zum Stillstand.

Antwort

1

Okay, ich habe herausgefunden, was hier schief gelaufen ist. Grundsätzlich war der Leseteil im Vergleich zum Einsteckteil zu schnell. Das Beispiel war ein wenig vereinfacht, da zwischen dem Einlesen und Einfügen eine Seda-Queue vorhanden war (da ich eine Auswahl für den Inhalt treffen musste, der im Beispiel nicht gezeigt wurde). Aber auch ohne die seda Warteschlange war es nie fertig. Ich erkannte, was falsch war, als ich Kamel tötete und eine Nachricht erhielt, dass es noch einige tausend Flugnachrichten gab.

Es macht also keinen Sinn, das Lesen mit paralleler Verarbeitung durchzuführen, wenn die Kuvertier-Seite nicht mithalten kann.

from("file:work/customer/").id("customerDataRoute") 
     .onCompletion().log("Customer data processing finished").end() 
     .log("Processing customer data ${file:name}") 
     .split(body().tokenize("\n")).streaming() //no more parallel processing 
     .choice() 
      .when(simple("${body} contains 'HEADER TEXT'")) //strip out the header if it exists 
      .log("Skipping first line") 
      .endChoice() 
     .otherwise() 
      .to("seda:processCustomer?size=40&concurrentConsumers=20&blockWhenFull=true") 
      .endChoice(); 


from("seda:processCustomer?size=40&concurrentConsumers=20&blockWhenFull=true") 
      .unmarshal(bindyCustomer) 
      .split(body()) 
      .process(new CustomerProcessor()).id("CustomProcessor") //converts one Notification into an array of values for the SQL insert 
.to("sql:INSERT INTO CUSTOMER_DATA(`FIELD1`,`FIELD2`) VALUES(#,#)"); 

I definiert eine Größe in der SEDA Warteschlange (standardmäßig nicht darauf beschränkt ist) und den aufrufenden Thread Block gemacht, wenn die Warteschlange voll ist.

seda:processCustomer?size=40&concurrentConsumers=20&blockWhenFull=true 

Die parallele Verarbeitung erfolgt über 20 gleichzeitige Benutzer in der SEDA-Warteschlange.Bitte beachten Sie, dass Sie aus welchem ​​Grund auch immer die Warteschlangengröße beim Aufruf der Route angeben müssen (nicht nur dort, wo Sie sie definieren).

Jetzt ist der Speicherverbrauch minimal und es fügt die 5 Millionen Datensätze ohne Probleme ein.

1

Ich habe Ihren Code nicht getestet, jedoch habe ich festgestellt, dass Ihre zweite Split-Anweisung nicht streamt. Ich empfehle das zu versuchen. Wenn Sie zu viele parallele Arbeitsabläufe haben, könnte der GC auffüllen, bevor Sie die Ressourcen freigeben, die Sie sperren würden. Die Zeit, die die SQL-Anweisung benötigt, ist wahrscheinlich, was es dem GC ermöglicht, zu viel Aufbauzeit zu erhalten, da Sie die Hauptverarbeitung parallelisieren.

from("file:work/customer/").id("customerDataRoute") 
    .split(body().tokenize("\n")).streaming().parallelProcessing() 
     .unmarshal(bindyCustomer) 
     .split(body()).streaming() //Add a streaming call here and see what happens 
      .process(new CustomerProcessor()) 
      .to("sql:INSERT INTO CUSTOMER_DATA(`FIELD1`,`FIELD2`) VALUES(#,#)"); 
+0

Danke für den Tipp. Ich habe es versucht, aber es hat das Problem nicht gelöst. Der .unmarchal (bindyCustomer) gibt ein Array mit nur einem Element zurück, so dass das Streaming in diesem Fall sowieso keinen großen Unterschied gemacht hat. Kannst du an etwas anderes denken, das falsch sein könnte? Ich werde versuchen, die Einfügung in Java zu machen, um zu sehen, ob das das Problem behebt. – Ben

+0

Hmm Ich habe ein paar grobe Vermutungen. Wären Sie in der Lage, ID-Tags zu Ihrer Route hinzuzufügen und dann Ihre JConsole zu öffnen, um zu bestätigen, wo alle Threads "hängen" sind? –

+0

Die Route bereits als ID (customerDataRoute) oder beziehen Sie sich auf etwas anderes? – Ben