2016-07-11 23 views
2

Ich versuche, Daten aus einer Datenbank abzufragen, einige Transformationen daran vorzunehmen und die neuen Daten im Parkettformat auf hdfs zu speichern.Nicht genügend Speicherfehler beim Schreiben von Spark-Datenframes in das Parkettformat

Da die Datenbankabfrage eine große Anzahl von Zeilen zurückgibt, erhalte ich die Daten in Stapeln und führe den obigen Prozess bei jedem eingehenden Stapel durch.

UPDATE 2: Die Batch-Verarbeitungslogik ist:

import scala.collection.JavaConverters._ 

import org.apache.spark.SparkContext 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{StructType, StructField, StringType} 

class Batch(rows: List[String], 
      sqlContext: SQLContext) { 

     // The actual schema has around 60 fields 
     val schema = Array("name", "age", "address").map(field => 
         StructField(field, StringType, true) 
        ) 

     val transformedRows = rows.map(rows => { 

       // transformation logic (returns Array[Array[String]] type) 

      }).map(row => Row.fromSeq(row.toSeq)) 

     val dataframe = sqlContext.createDataFrame(transformedRows.asJava, schema) 

} 

val sparkConf = new sparkConf().setAppName("Spark App") 
val sparkContext = new SparkContext(sparkConf) 
val sqlContext = new SQLContext(sparkContext) 

// Code to query database 
// queryResponse is essentially an iterator that fetches the next batch on calling queryResponse.next 

var batch_num = 0 

while (queryResponse.hasNext) { 
    val batch = queryResponse.next 

    val batchToSave = new Batch(
          batch.toList.map(_.getDocument.toString), 
          sqlContext) 

    batchToSave.dataframe.write.parquet(batch_num + "_Parquet") 

    batch_num += 1 

} 

My Spark-Version in 1.6.1 und die Funken vorzulegen ist:

spark-submit target/scala-2.10/Spark\ Application-assembly-1.0.jar 

Das Problem ist, dass nach einer gewissen Anzahl der Batches, bekomme ich einen java.lang.OutOfMemoryError Fehler.

Das gesamte Stacktrace ist:

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space 
    at java.util.Arrays.copyOfRange(Arrays.java:2694) 
    at java.lang.String.<init>(String.java:203) 
    at java.lang.StringBuilder.toString(StringBuilder.java:405) 
    at scala.StringContext.standardInterpolator(StringContext.scala:125) 
    at scala.StringContext.s(StringContext.scala:90) 
    at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:70) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) 
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) 
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334) 
    at app.Application$.main(App.scala:156) 
    at app.Application.main(App.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

Ich habe versucht, die Daten in einer einzigen Partition koaleszierende aber das hat keinen Unterschied machen.

dataframe.coalesce(1).write.parquet(batch_num + "_Parquet") 

Jede Hilfe wäre willkommen.

UPDATE 1

keine coalesce Transformation auf der RDD tun gibt immer noch einen Fehler, aber der Stacktrace ist wie folgt. Scheint ein Problem mit Parquet zu sein.

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1855) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1868) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1945) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) 
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) 
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334) 
    at app.Application$.main(App.scala:156) 
    at app.Application.main(App.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.OutOfMemoryError: Java heap space 
    at org.apache.parquet.column.values.dictionary.IntList.initSlab(IntList.java:90) 
    at org.apache.parquet.column.values.dictionary.IntList.<init>(IntList.java:86) 
    at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:93) 
    at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:229) 
    at org.apache.parquet.column.ParquetProperties.dictionaryWriter(ParquetProperties.java:131) 
    at org.apache.parquet.column.ParquetProperties.dictWriterWithFallBack(ParquetProperties.java:178) 
    at org.apache.parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:203) 
    at org.apache.parquet.column.impl.ColumnWriterV1.<init>(ColumnWriterV1.java:83) 
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.newMemColumn(ColumnWriteStoreV1.java:68) 
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.getColumnWriter(ColumnWriteStoreV1.java:56) 
    at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:183) 
    at org.apache.parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:375) 
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:109) 
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:99) 
    at org.apache.parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:100) 
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:303) 
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262) 
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetRelation.scala:94) 
    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:286) 
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:129) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:255) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
+1

nein nein nein nein nein und nein, verschmelze nicht - das ist das Gegenteil von dem, was du machen willst. – GameOfThrows

+0

Mögliches Duplikat von [Umgang mit "java.lang.OutOfMemoryError: Java-Heap-Space" -Fehler (64MB Heap-Größe)] (http://stackoverflow.com/questions/37335/how-to-deal-with-java- lang-outofmemoryerror-java-heap-space-error-64mb-heap) –

+0

Wenn Ihre Daten (über alle Partitionen hinweg, da Sie die Daten zusammenführen) größer ist als der verfügbare Speicher, haben Sie OOM. In den meisten Fällen ist das Zusammenführen zu einer einzelnen Partition eine schlechte Vorgehensweise, da die meisten Skalierbarkeitsvorteile von Ihnen benötigt werden. –

Antwort

2

Hatte das gleiche Problem heute auch. Es stellte sich heraus, dass mein Ausführungsplan ziemlich komplex war und toString 150 MB an Informationen generierte, die in Kombination mit der String-Interpolation von Scala dazu führten, dass der Treiber nicht mehr genügend Speicher hatte.

Sie können versuchen, den Treiberspeicher zu erhöhen (ich musste es von 8 GB auf 16 GB verdoppeln).